1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//! Provides [KafkaEgress](crate::io::kafka::KafkaEgress) for sending egress messages to Kafka.
//!
//! To use this, import the `KafkaEgress` trait and then use
//! [`kafka_egress()`](crate::io::kafka::KafkaEgress::kafka_egress) or
//! [`kafka_keyed_egress()`](crate::io::kafka::KafkaEgress::kafka_keyed_egress) on an
//! [Effects](crate::Effects) to send messages to Kafka.
//!
//! # Examples
//!
//! ```
//! use protobuf::well_known_types::StringValue;
//!
//! use statefun::io::kafka::KafkaEgress;
//! use statefun::{Address, Context, Effects, EgressIdentifier, FunctionRegistry, FunctionType};
//!
//! pub fn relay_to_kafka(_context: Context, message: StringValue) -> Effects {
//!     let mut effects = Effects::new();
//!
//!     effects.kafka_keyed_egress(
//!         EgressIdentifier::new("example", "greets"),
//!         "greeting",
//!         "the key",
//!         message,
//!     );
//!
//!     effects
//! }
//! ```

use protobuf::Message;

use statefun_proto::kafka_egress::KafkaProducerRecord;

use crate::{Effects, EgressIdentifier};

/// Extension trait for sending egress messages to Kafka using [Effects](crate::Effects).
pub trait KafkaEgress {
    /// Sends the given message to the Kafka topic `topic` via the egress specified using the
    /// `EgressIdentifier`.
    fn kafka_egress<M: Message>(&mut self, identifier: EgressIdentifier, topic: &str, message: M);

    /// Sends the given message to the Kafka topic `topic` via the egress specified using the
    /// `EgressIdentifier`.
    ///
    /// This will set the given key on the message sent to record.
    fn kafka_keyed_egress<M: Message>(
        &mut self,
        identifier: EgressIdentifier,
        topic: &str,
        key: &str,
        message: M,
    );
}

impl KafkaEgress for Effects {
    fn kafka_egress<M: Message>(&mut self, identifier: EgressIdentifier, topic: &str, message: M) {
        let kafka_record = egress_record(topic, message);
        self.egress(identifier, kafka_record);
    }

    fn kafka_keyed_egress<M: Message>(
        &mut self,
        identifier: EgressIdentifier,
        topic: &str,
        key: &str,
        message: M,
    ) {
        let mut kafka_record = egress_record(topic, message);
        kafka_record.set_key(key.to_owned());
        self.egress(identifier, kafka_record);
    }
}

fn egress_record<M: Message>(topic: &str, value: M) -> KafkaProducerRecord {
    let mut result = KafkaProducerRecord::new();
    result.set_topic(topic.to_owned());
    result.set_value_bytes(value.write_to_bytes().expect("Could not serialize value."));
    result
}