Skip to main content

crabka_client_streams/runtime/
eos.rs

1//! Exactly-once (EOS v2 / KIP-447) primitives: the processing-guarantee config,
2//! the transactional-producer I/O seam, and the streams group metadata used by
3//! `send_offsets_to_transaction`. Wired into the runtime in T2/T3.
4
5use async_trait::async_trait;
6
7use crate::error::StreamsClientError;
8
9/// Delivery guarantee for the runtime (`processing.guarantee`).
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum ProcessingGuarantee {
12    /// Produce-then-commit; a crash mid-cycle may replay (the default).
13    #[default]
14    AtLeastOnce,
15    /// Transactional: produce + offset-commit atomically (KIP-447).
16    ExactlyOnceV2,
17}
18
19/// Streams group metadata for `send_offsets_to_transaction` (maps to the native
20/// `crabka_client_consumer::ConsumerGroupMetadata`).
21// Field names mirror the Kafka `ConsumerGroupMetadata` mapping (group_id /
22// generation_id / member_id / group_instance_id) — keep them verbatim.
23#[allow(clippy::struct_field_names)]
24#[derive(Debug, Clone)]
25pub struct StreamsGroupMeta {
26    pub group_id: String,
27    /// The member epoch (next-gen "generation").
28    pub generation_id: i32,
29    pub member_id: String,
30    pub group_instance_id: Option<String>,
31}
32
33/// EOS-v2 transactional producer seam (DI'd; `BrokerTransactionalProducer` in
34/// production, `MockTransactionalProducer` for tests). Also impls
35/// `RecordProducer` for `send`.
36#[async_trait]
37pub trait TransactionalProducer: crate::runtime::io::RecordProducer {
38    async fn init_transactions(&self) -> Result<(), StreamsClientError>;
39    async fn begin_transaction(&self) -> Result<(), StreamsClientError>;
40    async fn send_offsets_to_transaction(
41        &self,
42        offsets: &[(String, i32, i64)],
43        group_meta: &StreamsGroupMeta,
44    ) -> Result<(), StreamsClientError>;
45    async fn commit_transaction(&self) -> Result<(), StreamsClientError>;
46    async fn abort_transaction(&self) -> Result<(), StreamsClientError>;
47}
48
49/// KIP-447 transactional id: stable per (application, thread) so a restart fences
50/// a zombie via the producer-epoch bump in `init_transactions`.
51#[must_use]
52pub fn transactional_id(application_id: &str, thread_idx: usize) -> String {
53    format!("{application_id}-{thread_idx}")
54}
55
56#[cfg(test)]
57pub(crate) mod mock {
58    use super::*;
59    use bytes::Bytes;
60    use std::sync::Mutex;
61
62    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
63    pub enum Step {
64        Init,
65        Begin,
66        Send,
67        SendOffsets,
68        Commit,
69        Abort,
70    }
71
72    /// Records the call sequence; `fail_at` makes the FIRST time that step is
73    /// reached return an error (drives the abort/rollback tests).
74    #[derive(Default)]
75    pub struct MockTransactionalProducer {
76        pub calls: Mutex<Vec<Step>>,
77        /// `(topic, partition, key, value)` of each `send` — a test-only record log.
78        #[allow(clippy::type_complexity)]
79        pub sent: Mutex<Vec<(String, Option<i32>, Option<Bytes>, Option<Bytes>)>>,
80        pub fail_at: Mutex<Option<Step>>,
81    }
82    impl MockTransactionalProducer {
83        fn record(&self, s: Step) -> Result<(), StreamsClientError> {
84            self.calls.lock().unwrap().push(s);
85            let mut f = self.fail_at.lock().unwrap();
86            if *f == Some(s) {
87                *f = None;
88                return Err(StreamsClientError::Runtime(format!("mock fail at {s:?}")));
89            }
90            Ok(())
91        }
92    }
93    #[async_trait]
94    impl crate::runtime::io::RecordProducer for MockTransactionalProducer {
95        async fn send(
96            &self,
97            topic: &str,
98            partition: Option<i32>,
99            key: Option<Bytes>,
100            value: Option<Bytes>,
101        ) -> Result<(), StreamsClientError> {
102            self.sent
103                .lock()
104                .unwrap()
105                .push((topic.to_string(), partition, key, value));
106            self.record(Step::Send)
107        }
108        async fn flush(&self) -> Result<(), StreamsClientError> {
109            Ok(())
110        }
111    }
112    #[async_trait]
113    impl TransactionalProducer for MockTransactionalProducer {
114        async fn init_transactions(&self) -> Result<(), StreamsClientError> {
115            self.record(Step::Init)
116        }
117        async fn begin_transaction(&self) -> Result<(), StreamsClientError> {
118            self.record(Step::Begin)
119        }
120        async fn send_offsets_to_transaction(
121            &self,
122            _o: &[(String, i32, i64)],
123            _m: &StreamsGroupMeta,
124        ) -> Result<(), StreamsClientError> {
125            self.record(Step::SendOffsets)
126        }
127        async fn commit_transaction(&self) -> Result<(), StreamsClientError> {
128            self.record(Step::Commit)
129        }
130        async fn abort_transaction(&self) -> Result<(), StreamsClientError> {
131            self.record(Step::Abort)
132        }
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use assert2::check;
140
141    #[test]
142    fn transactional_id_is_stable_per_thread() {
143        check!(transactional_id("word-count", 0) == "word-count-0");
144        check!(transactional_id("word-count", 1) == "word-count-1");
145    }
146
147    #[tokio::test]
148    async fn mock_records_calls_and_can_fail() {
149        use crate::runtime::io::RecordProducer;
150        use mock::{MockTransactionalProducer, Step};
151        let p = MockTransactionalProducer {
152            fail_at: std::sync::Mutex::new(Some(Step::Commit)),
153            ..Default::default()
154        };
155        p.begin_transaction().await.unwrap();
156        p.send("out", None, None, None).await.unwrap();
157        check!(p.commit_transaction().await.is_err());
158        check!(*p.calls.lock().unwrap() == vec![Step::Begin, Step::Send, Step::Commit]);
159    }
160}