use bytes::Bytes;
use crate::error::StreamsClientError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchedRec {
pub offset: i64,
pub key: Option<Bytes>,
pub value: Option<Bytes>,
pub timestamp: i64,
}
#[derive(Debug, Clone, Default)]
pub struct FetchBatch {
pub records: Vec<FetchedRec>,
}
impl FetchBatch {
#[must_use]
pub fn next_offset(&self, current: i64) -> i64 {
self.records.last().map_or(current, |r| r.offset + 1)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IsolationLevel {
#[default]
ReadUncommitted,
ReadCommitted,
}
#[async_trait::async_trait]
pub trait RecordFetcher: Send + Sync + 'static {
async fn fetch(
&self,
topic: &str,
partition: i32,
offset: i64,
isolation: IsolationLevel,
) -> Result<FetchBatch, StreamsClientError>;
async fn partitions(&self, _topic: &str) -> Result<Vec<i32>, StreamsClientError> {
Ok(vec![0])
}
}
#[async_trait::async_trait]
pub trait RecordProducer: Send + Sync + 'static {
async fn send(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
) -> Result<(), StreamsClientError>;
async fn send_with_timestamp(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
_timestamp_ms: Option<i64>,
) -> Result<(), StreamsClientError> {
self.send(topic, partition, key, value).await
}
async fn flush(&self) -> Result<(), StreamsClientError>;
}
#[async_trait::async_trait]
pub trait BeginTxnGate: Send {
async fn ensure_begun(&mut self) -> Result<(), StreamsClientError>;
}
#[async_trait::async_trait]
pub trait OffsetStore: Send + Sync + 'static {
async fn committed(
&self,
topic: &str,
partition: i32,
) -> Result<Option<i64>, StreamsClientError>;
async fn earliest(&self, topic: &str, partition: i32) -> Result<i64, StreamsClientError>;
async fn latest(&self, topic: &str, partition: i32) -> Result<i64, StreamsClientError>;
async fn commit(&self, offsets: &[(String, i32, i64)]) -> Result<(), StreamsClientError>;
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::check;
#[test]
fn fetch_batch_next_offset_advances_past_last() {
let b = FetchBatch {
records: vec![
FetchedRec {
offset: 5,
key: None,
value: Some(bytes::Bytes::from_static(b"a")),
timestamp: -1,
},
FetchedRec {
offset: 6,
key: None,
value: Some(bytes::Bytes::from_static(b"b")),
timestamp: -1,
},
],
};
check!(b.next_offset(0) == 7);
let empty = FetchBatch { records: vec![] };
check!(empty.next_offset(9) == 9);
}
}