statefun_sdk/io/kafka.rs
1//! Provides [KafkaEgress](crate::io::kafka::KafkaEgress) for sending egress messages to Kafka.
2//!
3//! To use this, import the `KafkaEgress` trait and then use
4//! [`kafka_egress()`](crate::io::kafka::KafkaEgress::kafka_egress) or
5//! [`kafka_keyed_egress()`](crate::io::kafka::KafkaEgress::kafka_keyed_egress) on an
6//! [Effects](crate::Effects) to send messages to Kafka.
7//!
8//! # Examples
9//!
10//! ```
11//! use protobuf::well_known_types::StringValue;
12//!
13//! use statefun_sdk::io::kafka::KafkaEgress;
14//! use statefun_sdk::{Address, Context, Effects, EgressIdentifier, FunctionRegistry, FunctionType};
15//!
16//! pub fn relay_to_kafka(_context: Context, message: StringValue) -> Effects {
17//! let mut effects = Effects::new();
18//!
19//! effects.kafka_keyed_egress(
20//! EgressIdentifier::new("example", "greets"),
21//! "greeting",
22//! "the key",
23//! message,
24//! );
25//!
26//! effects
27//! }
28//! ```
29
30use protobuf::Message;
31
32use statefun_proto::kafka_egress::KafkaProducerRecord;
33
34use crate::{Effects, EgressIdentifier};
35
36/// Extension trait for sending egress messages to Kafka using [Effects](crate::Effects).
37pub trait KafkaEgress {
38 /// Sends the given message to the Kafka topic `topic` via the egress specified using the
39 /// `EgressIdentifier`.
40 fn kafka_egress<M: Message>(&mut self, identifier: EgressIdentifier, topic: &str, message: M);
41
42 /// Sends the given message to the Kafka topic `topic` via the egress specified using the
43 /// `EgressIdentifier`.
44 ///
45 /// This will set the given key on the message sent to record.
46 fn kafka_keyed_egress<M: Message>(
47 &mut self,
48 identifier: EgressIdentifier,
49 topic: &str,
50 key: &str,
51 message: M,
52 );
53}
54
55impl KafkaEgress for Effects {
56 fn kafka_egress<M: Message>(&mut self, identifier: EgressIdentifier, topic: &str, message: M) {
57 let kafka_record = egress_record(topic, message);
58 self.egress(identifier, kafka_record);
59 }
60
61 fn kafka_keyed_egress<M: Message>(
62 &mut self,
63 identifier: EgressIdentifier,
64 topic: &str,
65 key: &str,
66 message: M,
67 ) {
68 let mut kafka_record = egress_record(topic, message);
69 kafka_record.set_key(key.to_owned());
70 self.egress(identifier, kafka_record);
71 }
72}
73
74fn egress_record<M: Message>(topic: &str, value: M) -> KafkaProducerRecord {
75 let mut result = KafkaProducerRecord::new();
76 result.set_topic(topic.to_owned());
77 result.set_value_bytes(value.write_to_bytes().expect("Could not serialize value."));
78 result
79}