Skip to main content

statefun_sdk/io/
kafka.rs

1//! Provides [KafkaEgress](crate::io::kafka::KafkaEgress) for sending egress messages to Kafka.
2//!
3//! To use this, import the `KafkaEgress` trait and then use
4//! [`kafka_egress()`](crate::io::kafka::KafkaEgress::kafka_egress) or
5//! [`kafka_keyed_egress()`](crate::io::kafka::KafkaEgress::kafka_keyed_egress) on an
6//! [Effects](crate::Effects) to send messages to Kafka.
7//!
8//! # Examples
9//!
10//! ```
11//! use protobuf::well_known_types::StringValue;
12//!
13//! use statefun_sdk::io::kafka::KafkaEgress;
14//! use statefun_sdk::{Address, Context, Effects, EgressIdentifier, FunctionRegistry, FunctionType};
15//!
16//! pub fn relay_to_kafka(_context: Context, message: StringValue) -> Effects {
17//!     let mut effects = Effects::new();
18//!
19//!     effects.kafka_keyed_egress(
20//!         EgressIdentifier::new("example", "greets"),
21//!         "greeting",
22//!         "the key",
23//!         message,
24//!     );
25//!
26//!     effects
27//! }
28//! ```
29
30use protobuf::Message;
31
32use statefun_proto::kafka_egress::KafkaProducerRecord;
33
34use crate::{Effects, EgressIdentifier};
35
36/// Extension trait for sending egress messages to Kafka using [Effects](crate::Effects).
37pub trait KafkaEgress {
38    /// Sends the given message to the Kafka topic `topic` via the egress specified using the
39    /// `EgressIdentifier`.
40    fn kafka_egress<M: Message>(&mut self, identifier: EgressIdentifier, topic: &str, message: M);
41
42    /// Sends the given message to the Kafka topic `topic` via the egress specified using the
43    /// `EgressIdentifier`.
44    ///
45    /// This will set the given key on the message sent to record.
46    fn kafka_keyed_egress<M: Message>(
47        &mut self,
48        identifier: EgressIdentifier,
49        topic: &str,
50        key: &str,
51        message: M,
52    );
53}
54
55impl KafkaEgress for Effects {
56    fn kafka_egress<M: Message>(&mut self, identifier: EgressIdentifier, topic: &str, message: M) {
57        let kafka_record = egress_record(topic, message);
58        self.egress(identifier, kafka_record);
59    }
60
61    fn kafka_keyed_egress<M: Message>(
62        &mut self,
63        identifier: EgressIdentifier,
64        topic: &str,
65        key: &str,
66        message: M,
67    ) {
68        let mut kafka_record = egress_record(topic, message);
69        kafka_record.set_key(key.to_owned());
70        self.egress(identifier, kafka_record);
71    }
72}
73
74fn egress_record<M: Message>(topic: &str, value: M) -> KafkaProducerRecord {
75    let mut result = KafkaProducerRecord::new();
76    result.set_topic(topic.to_owned());
77    result.set_value_bytes(value.write_to_bytes().expect("Could not serialize value."));
78    result
79}