use async_trait::async_trait;
use crate::error::StreamsClientError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ProcessingGuarantee {
#[default]
AtLeastOnce,
ExactlyOnceV2,
}
#[allow(clippy::struct_field_names)]
#[derive(Debug, Clone)]
pub struct StreamsGroupMeta {
pub group_id: String,
pub generation_id: i32,
pub member_id: String,
pub group_instance_id: Option<String>,
}
#[async_trait]
pub trait TransactionalProducer: crate::runtime::io::RecordProducer {
async fn init_transactions(&self) -> Result<(), StreamsClientError>;
async fn begin_transaction(&self) -> Result<(), StreamsClientError>;
async fn send_offsets_to_transaction(
&self,
offsets: &[(String, i32, i64)],
group_meta: &StreamsGroupMeta,
) -> Result<(), StreamsClientError>;
async fn commit_transaction(&self) -> Result<(), StreamsClientError>;
async fn abort_transaction(&self) -> Result<(), StreamsClientError>;
}
#[must_use]
pub fn transactional_id(application_id: &str, thread_idx: usize) -> String {
format!("{application_id}-{thread_idx}")
}
#[cfg(test)]
pub(crate) mod mock {
use super::*;
use bytes::Bytes;
use std::sync::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Step {
Init,
Begin,
Send,
SendOffsets,
Commit,
Abort,
}
#[derive(Default)]
pub struct MockTransactionalProducer {
pub calls: Mutex<Vec<Step>>,
#[allow(clippy::type_complexity)]
pub sent: Mutex<Vec<(String, Option<i32>, Option<Bytes>, Option<Bytes>)>>,
pub fail_at: Mutex<Option<Step>>,
}
impl MockTransactionalProducer {
fn record(&self, s: Step) -> Result<(), StreamsClientError> {
self.calls.lock().unwrap().push(s);
let mut f = self.fail_at.lock().unwrap();
if *f == Some(s) {
*f = None;
return Err(StreamsClientError::Runtime(format!("mock fail at {s:?}")));
}
Ok(())
}
}
#[async_trait]
impl crate::runtime::io::RecordProducer for MockTransactionalProducer {
async fn send(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
) -> Result<(), StreamsClientError> {
self.sent
.lock()
.unwrap()
.push((topic.to_string(), partition, key, value));
self.record(Step::Send)
}
async fn flush(&self) -> Result<(), StreamsClientError> {
Ok(())
}
}
#[async_trait]
impl TransactionalProducer for MockTransactionalProducer {
async fn init_transactions(&self) -> Result<(), StreamsClientError> {
self.record(Step::Init)
}
async fn begin_transaction(&self) -> Result<(), StreamsClientError> {
self.record(Step::Begin)
}
async fn send_offsets_to_transaction(
&self,
_o: &[(String, i32, i64)],
_m: &StreamsGroupMeta,
) -> Result<(), StreamsClientError> {
self.record(Step::SendOffsets)
}
async fn commit_transaction(&self) -> Result<(), StreamsClientError> {
self.record(Step::Commit)
}
async fn abort_transaction(&self) -> Result<(), StreamsClientError> {
self.record(Step::Abort)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::check;
#[test]
fn transactional_id_is_stable_per_thread() {
check!(transactional_id("word-count", 0) == "word-count-0");
check!(transactional_id("word-count", 1) == "word-count-1");
}
#[tokio::test]
async fn mock_records_calls_and_can_fail() {
use crate::runtime::io::RecordProducer;
use mock::{MockTransactionalProducer, Step};
let p = MockTransactionalProducer {
fail_at: std::sync::Mutex::new(Some(Step::Commit)),
..Default::default()
};
p.begin_transaction().await.unwrap();
p.send("out", None, None, None).await.unwrap();
check!(p.commit_transaction().await.is_err());
check!(*p.calls.lock().unwrap() == vec![Step::Begin, Step::Send, Step::Commit]);
}
}