crabka_client_streams/runtime/
eos.rs1use async_trait::async_trait;
6
7use crate::error::StreamsClientError;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum ProcessingGuarantee {
12 #[default]
14 AtLeastOnce,
15 ExactlyOnceV2,
17}
18
19#[allow(clippy::struct_field_names)]
24#[derive(Debug, Clone)]
25pub struct StreamsGroupMeta {
26 pub group_id: String,
27 pub generation_id: i32,
29 pub member_id: String,
30 pub group_instance_id: Option<String>,
31}
32
33#[async_trait]
37pub trait TransactionalProducer: crate::runtime::io::RecordProducer {
38 async fn init_transactions(&self) -> Result<(), StreamsClientError>;
39 async fn begin_transaction(&self) -> Result<(), StreamsClientError>;
40 async fn send_offsets_to_transaction(
41 &self,
42 offsets: &[(String, i32, i64)],
43 group_meta: &StreamsGroupMeta,
44 ) -> Result<(), StreamsClientError>;
45 async fn commit_transaction(&self) -> Result<(), StreamsClientError>;
46 async fn abort_transaction(&self) -> Result<(), StreamsClientError>;
47}
48
49#[must_use]
52pub fn transactional_id(application_id: &str, thread_idx: usize) -> String {
53 format!("{application_id}-{thread_idx}")
54}
55
56#[cfg(test)]
57pub(crate) mod mock {
58 use super::*;
59 use bytes::Bytes;
60 use std::sync::Mutex;
61
62 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
63 pub enum Step {
64 Init,
65 Begin,
66 Send,
67 SendOffsets,
68 Commit,
69 Abort,
70 }
71
72 #[derive(Default)]
75 pub struct MockTransactionalProducer {
76 pub calls: Mutex<Vec<Step>>,
77 #[allow(clippy::type_complexity)]
79 pub sent: Mutex<Vec<(String, Option<i32>, Option<Bytes>, Option<Bytes>)>>,
80 pub fail_at: Mutex<Option<Step>>,
81 }
82 impl MockTransactionalProducer {
83 fn record(&self, s: Step) -> Result<(), StreamsClientError> {
84 self.calls.lock().unwrap().push(s);
85 let mut f = self.fail_at.lock().unwrap();
86 if *f == Some(s) {
87 *f = None;
88 return Err(StreamsClientError::Runtime(format!("mock fail at {s:?}")));
89 }
90 Ok(())
91 }
92 }
93 #[async_trait]
94 impl crate::runtime::io::RecordProducer for MockTransactionalProducer {
95 async fn send(
96 &self,
97 topic: &str,
98 partition: Option<i32>,
99 key: Option<Bytes>,
100 value: Option<Bytes>,
101 ) -> Result<(), StreamsClientError> {
102 self.sent
103 .lock()
104 .unwrap()
105 .push((topic.to_string(), partition, key, value));
106 self.record(Step::Send)
107 }
108 async fn flush(&self) -> Result<(), StreamsClientError> {
109 Ok(())
110 }
111 }
112 #[async_trait]
113 impl TransactionalProducer for MockTransactionalProducer {
114 async fn init_transactions(&self) -> Result<(), StreamsClientError> {
115 self.record(Step::Init)
116 }
117 async fn begin_transaction(&self) -> Result<(), StreamsClientError> {
118 self.record(Step::Begin)
119 }
120 async fn send_offsets_to_transaction(
121 &self,
122 _o: &[(String, i32, i64)],
123 _m: &StreamsGroupMeta,
124 ) -> Result<(), StreamsClientError> {
125 self.record(Step::SendOffsets)
126 }
127 async fn commit_transaction(&self) -> Result<(), StreamsClientError> {
128 self.record(Step::Commit)
129 }
130 async fn abort_transaction(&self) -> Result<(), StreamsClientError> {
131 self.record(Step::Abort)
132 }
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use assert2::check;
140
141 #[test]
142 fn transactional_id_is_stable_per_thread() {
143 check!(transactional_id("word-count", 0) == "word-count-0");
144 check!(transactional_id("word-count", 1) == "word-count-1");
145 }
146
147 #[tokio::test]
148 async fn mock_records_calls_and_can_fail() {
149 use crate::runtime::io::RecordProducer;
150 use mock::{MockTransactionalProducer, Step};
151 let p = MockTransactionalProducer {
152 fail_at: std::sync::Mutex::new(Some(Step::Commit)),
153 ..Default::default()
154 };
155 p.begin_transaction().await.unwrap();
156 p.send("out", None, None, None).await.unwrap();
157 check!(p.commit_transaction().await.is_err());
158 check!(*p.calls.lock().unwrap() == vec![Step::Begin, Step::Send, Step::Commit]);
159 }
160}