use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::error::StreamsClientError;
use crate::membership::TopicPartition;
use crate::processor::graph::Graph;
use crate::runtime::eos::ProcessingGuarantee;
use crate::runtime::io::{
BeginTxnGate, IsolationLevel, OffsetStore, RecordFetcher, RecordProducer,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TaskRole {
Active,
Standby,
Warmup,
}
pub(crate) struct StreamTask {
#[allow(dead_code)]
pub(crate) subtopology_id: String,
pub(crate) graph: Graph,
pub(crate) partition: i32,
positions: HashMap<(String, i32), i64>,
source_topics: HashSet<String>,
pending: HashMap<(String, i32), i64>,
producer: Arc<dyn RecordProducer>,
pub(crate) store: Arc<dyn OffsetStore>,
pub(crate) role: TaskRole,
pub(crate) changelog_offsets: HashMap<String, i64>,
pub(crate) guarantee: ProcessingGuarantee,
}
impl StreamTask {
pub fn new(
subtopology_id: String,
graph: Graph,
sources: Vec<TopicPartition>,
producer: Arc<dyn RecordProducer>,
store: Arc<dyn OffsetStore>,
role: TaskRole,
guarantee: ProcessingGuarantee,
) -> Self {
let partition = sources.first().map_or(0, |tp| tp.partition);
let source_topics: HashSet<String> = sources.iter().map(|tp| tp.topic.clone()).collect();
let positions = sources
.into_iter()
.map(|tp| ((tp.topic, tp.partition), 0))
.collect();
Self {
subtopology_id,
graph,
partition,
positions,
source_topics,
pending: HashMap::new(),
producer,
store,
role,
changelog_offsets: HashMap::new(),
guarantee,
}
}
pub(crate) fn registry(&self) -> &crate::store::registry::StoreRegistry {
&self.graph.stores
}
pub async fn init(&mut self) -> Result<(), StreamsClientError> {
self.graph
.init_processors()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
pub async fn close_processors(&mut self) {
self.graph.close_processors().await;
}
pub async fn restore(&mut self, fetcher: &dyn RecordFetcher) -> Result<(), StreamsClientError> {
let isolation = if self.guarantee == ProcessingGuarantee::ExactlyOnceV2 {
IsolationLevel::ReadCommitted
} else {
IsolationLevel::ReadUncommitted
};
self.graph.set_logging(false);
let names = self.graph.stores.names();
for name in names {
let changelog_topic = {
let store = self.graph.stores.get_mut(&name).expect("store in registry");
store.changelog_topic().to_string()
};
let mut offset = *self
.changelog_offsets
.entry(changelog_topic.clone())
.or_insert(0);
loop {
let batch = fetcher
.fetch(&changelog_topic, self.partition, offset, isolation)
.await?;
if batch.records.is_empty() {
break;
}
let mut advanced = false;
for rec in &batch.records {
self.graph
.restore_apply(
&name,
rec.key.clone().unwrap_or_default(),
rec.value.clone(),
)
.await;
let next = rec.offset + 1;
if next > offset {
offset = next;
advanced = true;
}
}
if !advanced {
break;
}
}
self.changelog_offsets.insert(changelog_topic, offset);
}
self.graph.set_logging(true);
Ok(())
}
pub async fn restore_step(
&mut self,
fetcher: &dyn RecordFetcher,
) -> Result<(), StreamsClientError> {
let isolation = if self.guarantee == ProcessingGuarantee::ExactlyOnceV2 {
IsolationLevel::ReadCommitted
} else {
IsolationLevel::ReadUncommitted
};
self.graph.set_logging(false);
let names = self.graph.stores.names();
for name in names {
let changelog_topic = {
let store = self.graph.stores.get_mut(&name).expect("store in registry");
store.changelog_topic().to_string()
};
let offset = *self
.changelog_offsets
.entry(changelog_topic.clone())
.or_insert(0);
let batch = fetcher
.fetch(&changelog_topic, self.partition, offset, isolation)
.await?;
let mut next_offset = offset;
for rec in &batch.records {
self.graph
.restore_apply(
&name,
rec.key.clone().unwrap_or_default(),
rec.value.clone(),
)
.await;
if rec.offset + 1 > next_offset {
next_offset = rec.offset + 1;
}
}
self.changelog_offsets.insert(changelog_topic, next_offset);
}
self.graph.set_logging(true);
Ok(())
}
pub async fn compute_changelog_offsets(&mut self) -> Result<(i64, i64), StreamsClientError> {
let mut current_sum = 0;
let mut end_sum = 0;
let names = self.graph.stores.names();
for name in names {
let changelog_topic = {
let store = self.graph.stores.get_mut(&name).expect("store in registry");
store.changelog_topic().to_string()
};
let end_offset = self.store.latest(&changelog_topic, self.partition).await?;
let current_offset = if self.role == TaskRole::Active {
end_offset
} else {
*self
.changelog_offsets
.entry(changelog_topic.clone())
.or_insert(0)
};
current_sum += current_offset;
end_sum += end_offset;
}
Ok((current_sum, end_sum))
}
pub async fn rollback(
&mut self,
fetcher: &dyn RecordFetcher,
) -> Result<(), StreamsClientError> {
self.pending.clear();
self.seek_to_start().await?; self.graph.clear_stores().await;
self.restore(fetcher).await?; Ok(())
}
pub async fn seek_to_start(&mut self) -> Result<(), StreamsClientError> {
let keys: Vec<(String, i32)> = self.positions.keys().cloned().collect();
for (topic, partition) in keys {
let start = match self.store.committed(&topic, partition).await? {
Some(o) => o,
None => self.store.earliest(&topic, partition).await?,
};
self.positions.insert((topic, partition), start);
}
Ok(())
}
pub async fn process_once(
&mut self,
fetcher: &dyn RecordFetcher,
mut begin_gate: Option<&mut dyn BeginTxnGate>,
) -> Result<(), StreamsClientError> {
let keys: Vec<(String, i32)> = self.positions.keys().cloned().collect();
for (topic, partition) in keys {
let offset = self.positions[&(topic.clone(), partition)];
let batch = fetcher
.fetch(&topic, partition, offset, IsolationLevel::ReadUncommitted)
.await?;
if batch.records.is_empty() {
continue;
}
if let Some(gate) = begin_gate.as_deref_mut() {
gate.ensure_begun().await?;
}
for rec in &batch.records {
self.graph
.pipe(
&topic,
rec.key.as_deref(),
rec.value.as_deref().unwrap_or(&[]),
rec.timestamp,
)
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
for out in self.graph.take_output() {
self.producer
.send(&out.topic, None, out.key, out.value)
.await?;
}
}
for (cl_topic, key, value) in self.graph.drain_changelogs(&self.source_topics) {
self.producer
.send(&cl_topic, Some(self.partition), Some(key), value)
.await?;
}
self.punctuate_stream_time().await?;
let next = batch.next_offset(offset);
self.positions.insert((topic.clone(), partition), next);
self.pending.insert((topic, partition), next);
}
Ok(())
}
pub async fn punctuate_stream_time(&mut self) -> Result<(), StreamsClientError> {
self.graph
.punctuate_stream_time(self.graph.stream_time)
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
self.drain_punctuation_output().await
}
pub async fn punctuate_wall_clock(&mut self, now_ms: i64) -> Result<(), StreamsClientError> {
self.graph
.punctuate_wall_clock(now_ms)
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
self.drain_punctuation_output().await
}
async fn drain_punctuation_output(&mut self) -> Result<(), StreamsClientError> {
for out in self.graph.take_output() {
self.producer
.send(&out.topic, None, out.key, out.value)
.await?;
}
for (cl_topic, key, value) in self.graph.drain_changelogs(&self.source_topics) {
self.producer
.send(&cl_topic, Some(self.partition), Some(key), value)
.await?;
}
Ok(())
}
pub fn pending_offsets(&self) -> Vec<(String, i32, i64)> {
self.pending
.iter()
.map(|((t, p), o)| (t.clone(), *p, *o))
.collect()
}
pub fn clear_pending(&mut self) {
self.pending.clear();
}
pub async fn commit(&mut self) -> Result<(), StreamsClientError> {
self.producer.flush().await?;
if self.pending.is_empty() {
return Ok(());
}
let offsets: Vec<(String, i32, i64)> = self
.pending
.iter()
.map(|((t, p), o)| (t.clone(), *p, *o))
.collect();
self.store.commit(&offsets).await?;
self.pending.clear();
Ok(())
}
#[cfg(test)]
pub(crate) async fn store_get_i64(&mut self, name: &str, key: &String) -> Option<i64> {
match self.graph.stores.get_kv::<String, i64>(name) {
Some(s) => s.get(key).await,
None => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::membership::TopicPartition;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
use crate::processor::serde::{Consumed, I64Serde, Produced, StringSerde};
use crate::runtime::io::{
FetchBatch, FetchedRec, IsolationLevel, OffsetStore, RecordFetcher, RecordProducer,
};
use crate::topology::Topology;
use assert2::check;
use std::collections::HashMap;
use std::sync::Mutex as StdMutex;
struct Counter;
#[async_trait::async_trait]
impl Processor<String, String, String, i64> for Counter {
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, String, i64>,
r: Record<String, String>,
) {
let n = {
let store = ctx.get_state_store::<String, i64>("counts").unwrap();
let n = store.get(&r.value).await.unwrap_or(0) + 1;
store.put(r.value.clone(), n).await;
n
};
ctx.forward(Record::new(Some(r.value), n, r.timestamp));
}
}
fn stateful_built() -> crate::topology::BuiltTopology {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let c = t.add_processor("c", || Counter, [&src]);
t.add_state_store("counts", StringSerde, I64Serde, [c.name()]);
t.add_sink("out", "out", [&c], Produced::with(StringSerde, I64Serde));
t.build("app").unwrap()
}
struct ScriptedFetcher {
scripts: StdMutex<HashMap<(String, i32, i64), FetchBatch>>,
}
impl ScriptedFetcher {
fn new(scripts: Vec<((String, i32, i64), FetchBatch)>) -> Self {
Self {
scripts: StdMutex::new(scripts.into_iter().collect()),
}
}
}
#[async_trait::async_trait]
impl RecordFetcher for ScriptedFetcher {
async fn fetch(
&self,
t: &str,
p: i32,
o: i64,
_isolation: IsolationLevel,
) -> Result<FetchBatch, crate::StreamsClientError> {
Ok(self
.scripts
.lock()
.unwrap()
.remove(&(t.to_string(), p, o))
.unwrap_or_default())
}
}
struct Upper;
#[async_trait::async_trait]
impl Processor<String, String, String, String> for Upper {
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, String, String>,
r: Record<String, String>,
) {
ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
}
}
fn built() -> crate::topology::BuiltTopology {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let up = t.add_processor("up", || Upper, [&src]);
t.add_sink(
"out",
"out",
[&up],
Produced::with(StringSerde, StringSerde),
);
t.build("app").unwrap()
}
struct OneShot {
batch: StdMutex<Option<FetchBatch>>,
}
#[async_trait::async_trait]
impl RecordFetcher for OneShot {
async fn fetch(
&self,
_t: &str,
_p: i32,
_o: i64,
_isolation: IsolationLevel,
) -> Result<FetchBatch, crate::StreamsClientError> {
Ok(self.batch.lock().unwrap().take().unwrap_or_default())
}
}
type SentRecord = (
String,
Option<i32>,
Option<bytes::Bytes>,
Option<bytes::Bytes>,
);
#[derive(Default)]
struct CollectProducer {
sent: StdMutex<Vec<SentRecord>>,
flushes: StdMutex<u32>,
}
#[async_trait::async_trait]
impl RecordProducer for CollectProducer {
async fn send(
&self,
topic: &str,
partition: Option<i32>,
k: Option<bytes::Bytes>,
v: Option<bytes::Bytes>,
) -> Result<(), crate::StreamsClientError> {
self.sent
.lock()
.unwrap()
.push((topic.to_string(), partition, k, v));
Ok(())
}
async fn flush(&self) -> Result<(), crate::StreamsClientError> {
*self.flushes.lock().unwrap() += 1;
Ok(())
}
}
#[derive(Default)]
struct MemStore {
committed: StdMutex<HashMap<(String, i32), i64>>,
latest: StdMutex<HashMap<(String, i32), i64>>,
}
#[async_trait::async_trait]
impl OffsetStore for MemStore {
async fn committed(
&self,
t: &str,
p: i32,
) -> Result<Option<i64>, crate::StreamsClientError> {
Ok(self
.committed
.lock()
.unwrap()
.get(&(t.to_string(), p))
.copied())
}
async fn earliest(&self, _t: &str, _p: i32) -> Result<i64, crate::StreamsClientError> {
Ok(0)
}
async fn latest(&self, t: &str, p: i32) -> Result<i64, crate::StreamsClientError> {
Ok(self
.latest
.lock()
.unwrap()
.get(&(t.to_string(), p))
.copied()
.unwrap_or(0))
}
async fn commit(
&self,
offs: &[(String, i32, i64)],
) -> Result<(), crate::StreamsClientError> {
let mut m = self.committed.lock().unwrap();
for (t, p, o) in offs {
m.insert((t.clone(), *p), *o);
}
Ok(())
}
}
#[tokio::test]
async fn processes_batch_produces_and_commits() {
let producer = std::sync::Arc::new(CollectProducer::default());
let store = std::sync::Arc::new(MemStore::default());
let fetcher = OneShot {
batch: StdMutex::new(Some(FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: Some("k".into()),
value: Some("hi".into()),
timestamp: -1,
}],
})),
};
let mut task = StreamTask::new(
"0".into(),
built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Active,
ProcessingGuarantee::AtLeastOnce,
);
task.seek_to_start().await.unwrap(); task.process_once(&fetcher, None).await.unwrap(); task.commit().await.unwrap(); check!(
producer
.sent
.lock()
.unwrap()
.iter()
.any(|(t, _p, _k, v)| t == "out" && v.as_deref() == Some(b"HI".as_ref()))
);
check!(*producer.flushes.lock().unwrap() >= 1);
check!(store.committed.lock().unwrap().get(&("in".to_string(), 0)) == Some(&1)); }
#[tokio::test]
async fn stateful_task_produces_changelog_and_restores() {
let producer_a = std::sync::Arc::new(CollectProducer::default());
let store_a = std::sync::Arc::new(MemStore::default());
let fetcher_a = ScriptedFetcher::new(vec![(
("in".to_string(), 0, 0),
FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: None,
value: Some("a".into()),
timestamp: -1,
}],
},
)]);
let mut task_a = StreamTask::new(
"0".into(),
stateful_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer_a) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store_a) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Active,
ProcessingGuarantee::AtLeastOnce,
);
task_a.init().await.unwrap();
task_a.process_once(&fetcher_a, None).await.unwrap();
task_a.commit().await.unwrap();
{
let sent_a = producer_a.sent.lock().unwrap();
let out_topics: Vec<&str> = sent_a.iter().map(|(t, _p, _k, _v)| t.as_str()).collect();
check!(
out_topics.contains(&"out"),
"sink record must be produced to 'out'"
);
check!(
out_topics.contains(&"app-counts-changelog"),
"changelog record must be produced to 'app-counts-changelog'"
);
}
let cl_key = bytes::Bytes::copy_from_slice(b"a");
let cl_val = bytes::Bytes::copy_from_slice(&5i64.to_be_bytes());
let producer_b = std::sync::Arc::new(CollectProducer::default());
let store_b = std::sync::Arc::new(MemStore::default());
let fetcher_b = ScriptedFetcher::new(vec![(
("app-counts-changelog".to_string(), 0, 0),
FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: Some(cl_key),
value: Some(cl_val),
timestamp: -1,
}],
},
)]);
let mut task_b = StreamTask::new(
"0".into(),
stateful_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer_b) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store_b) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Active,
ProcessingGuarantee::AtLeastOnce,
);
task_b.restore(&fetcher_b).await.unwrap();
check!(
task_b.store_get_i64("counts", &"a".to_string()).await == Some(5),
"restore must seed the store with the changelog value"
);
let fetcher_b2 = ScriptedFetcher::new(vec![(
("in".to_string(), 0, 0),
FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: None,
value: Some("a".into()),
timestamp: -1,
}],
},
)]);
task_b.process_once(&fetcher_b2, None).await.unwrap();
let sent_b = producer_b.sent.lock().unwrap();
check!(
sent_b
.iter()
.any(|(t, _p, _k, v)| t == "out"
&& v.as_deref() == Some(6i64.to_be_bytes().as_ref())),
"after restore with N=5, processing 'a' must emit count = 6"
);
}
struct EmitTs;
#[async_trait::async_trait]
impl crate::processor::punctuation::Punctuator<String, i64> for EmitTs {
async fn punctuate(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>, ts: i64) {
ctx.forward(Record::new(None, ts, ts));
}
}
struct StreamTimeScheduler;
#[async_trait::async_trait]
impl Processor<String, String, String, i64> for StreamTimeScheduler {
async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>) {
ctx.schedule(
std::time::Duration::from_millis(10),
crate::processor::punctuation::PunctuationType::StreamTime,
EmitTs,
);
}
async fn process(
&mut self,
_ctx: &mut ProcessorContext<'_, '_, String, i64>,
_r: Record<String, String>,
) {
}
}
fn stream_time_punct_built() -> crate::topology::BuiltTopology {
let mut t = Topology::new();
let src = t.add_source("src", ["in"], Consumed::with(StringSerde, StringSerde));
let p = t.add_processor("p", || StreamTimeScheduler, [&src]);
t.add_sink("out", "out", [&p], Produced::with(StringSerde, I64Serde));
t.build("app").unwrap()
}
#[tokio::test]
async fn process_once_fires_stream_time_punctuation() {
let producer = std::sync::Arc::new(CollectProducer::default());
let store = std::sync::Arc::new(MemStore::default());
let fetcher = ScriptedFetcher::new(vec![(
("in".to_string(), 0, 0),
FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: Some("k".into()),
value: Some("v".into()),
timestamp: 25,
}],
},
)]);
let mut task = StreamTask::new(
"0".into(),
stream_time_punct_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Active,
crate::runtime::eos::ProcessingGuarantee::AtLeastOnce,
);
task.init().await.unwrap(); task.process_once(&fetcher, None).await.unwrap();
let sent = producer.sent.lock().unwrap();
check!(
sent.iter()
.any(|(t, _p, _k, v)| t == "out"
&& v.as_deref() == Some(25i64.to_be_bytes().as_ref())),
"stream-time punctuator must fire from process_once and emit value=25, got {sent:?}"
);
}
#[tokio::test]
async fn changelog_sends_pin_task_partition() {
const TASK_PARTITION: i32 = 2;
let producer = std::sync::Arc::new(CollectProducer::default());
let store = std::sync::Arc::new(MemStore::default());
let fetcher = ScriptedFetcher::new(vec![(
("in".to_string(), TASK_PARTITION, 0),
FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: None,
value: Some("x".into()),
timestamp: -1,
}],
},
)]);
let mut task = StreamTask::new(
"0".into(),
stateful_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: TASK_PARTITION,
}],
std::sync::Arc::clone(&producer) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Active,
ProcessingGuarantee::AtLeastOnce,
);
task.init().await.unwrap();
task.process_once(&fetcher, None).await.unwrap();
let sent = producer.sent.lock().unwrap();
let sink_rec = sent
.iter()
.find(|(t, _p, _k, _v)| t == "out")
.expect("sink record must be produced to 'out'");
check!(
sink_rec.1.is_none(),
"sink send must use key-hash routing (partition None), got {:?}",
sink_rec.1
);
let cl_rec = sent
.iter()
.find(|(t, _p, _k, _v)| t == "app-counts-changelog")
.expect("changelog record must be produced to 'app-counts-changelog'");
check!(
cl_rec.1 == Some(TASK_PARTITION),
"changelog send must be pinned to task partition {TASK_PARTITION}, got {:?}",
cl_rec.1
);
}
#[tokio::test]
async fn restore_step_replays_increments_and_advances_offsets() {
let producer = std::sync::Arc::new(CollectProducer::default());
let store = std::sync::Arc::new(MemStore::default());
let cl_key = bytes::Bytes::copy_from_slice(b"a");
let cl_val = bytes::Bytes::copy_from_slice(&12i64.to_be_bytes());
let fetcher = ScriptedFetcher::new(vec![(
("app-counts-changelog".to_string(), 0, 0),
FetchBatch {
records: vec![FetchedRec {
offset: 0,
key: Some(cl_key),
value: Some(cl_val),
timestamp: -1,
}],
},
)]);
let mut task = StreamTask::new(
"0".into(),
stateful_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Standby,
ProcessingGuarantee::AtLeastOnce,
);
task.restore_step(&fetcher).await.unwrap();
check!(
task.store_get_i64("counts", &"a".to_string()).await == Some(12),
"restore_step must replay changelog record to store"
);
check!(
task.changelog_offsets.get("app-counts-changelog") == Some(&1),
"restore_step must advance tracked offset to 1"
);
}
#[tokio::test]
async fn compute_changelog_offsets_calculates_correct_sums() {
let producer = std::sync::Arc::new(CollectProducer::default());
let store = std::sync::Arc::new(MemStore::default());
store
.latest
.lock()
.unwrap()
.insert(("app-counts-changelog".to_string(), 0), 15);
let mut task = StreamTask::new(
"0".into(),
stateful_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Warmup,
ProcessingGuarantee::AtLeastOnce,
);
let (curr, end) = task.compute_changelog_offsets().await.unwrap();
check!(curr == 0);
check!(end == 15);
task.changelog_offsets
.insert("app-counts-changelog".to_string(), 10);
let (curr, end) = task.compute_changelog_offsets().await.unwrap();
check!(curr == 10);
check!(end == 15);
task.role = TaskRole::Active;
let (curr, end) = task.compute_changelog_offsets().await.unwrap();
check!(curr == 15);
check!(end == 15);
}
struct IsolationFetcher;
impl IsolationFetcher {
fn changelog_value(n: i64) -> bytes::Bytes {
bytes::Bytes::copy_from_slice(&n.to_be_bytes())
}
}
#[async_trait::async_trait]
impl RecordFetcher for IsolationFetcher {
async fn fetch(
&self,
t: &str,
p: i32,
o: i64,
isolation: IsolationLevel,
) -> Result<FetchBatch, crate::StreamsClientError> {
if t == "app-counts-changelog" && p == 0 && o == 0 {
let committed = FetchedRec {
offset: 0,
key: Some(bytes::Bytes::copy_from_slice(b"a")),
value: Some(Self::changelog_value(5)),
timestamp: -1,
};
let aborted = FetchedRec {
offset: 1,
key: Some(bytes::Bytes::copy_from_slice(b"b")),
value: Some(Self::changelog_value(99)),
timestamp: -1,
};
let records = match isolation {
IsolationLevel::ReadCommitted => vec![committed],
IsolationLevel::ReadUncommitted => vec![committed, aborted],
};
Ok(FetchBatch { records })
} else {
Ok(FetchBatch::default())
}
}
}
async fn restore_counter_task(guarantee: ProcessingGuarantee) -> StreamTask {
let producer = std::sync::Arc::new(CollectProducer::default());
let store = std::sync::Arc::new(MemStore::default());
let mut task = StreamTask::new(
"0".into(),
stateful_built()
.instantiate(&crate::store::backend::StoreBackend::InMemory, "app")
.await
.unwrap(),
vec![TopicPartition {
topic: "in".into(),
partition: 0,
}],
std::sync::Arc::clone(&producer) as std::sync::Arc<dyn RecordProducer>,
std::sync::Arc::clone(&store) as std::sync::Arc<dyn OffsetStore>,
TaskRole::Active,
guarantee,
);
task.restore(&IsolationFetcher).await.unwrap();
task
}
#[tokio::test]
async fn eos_restore_reads_committed_only() {
let mut task = restore_counter_task(ProcessingGuarantee::ExactlyOnceV2).await;
check!(
task.store_get_i64("counts", &"a".to_string()).await == Some(5),
"EOS restore must seed the committed changelog record"
);
check!(
task.store_get_i64("counts", &"b".to_string()).await == None,
"EOS restore (READ_COMMITTED) must exclude the aborted write"
);
}
#[tokio::test]
async fn alo_restore_reads_uncommitted_sees_both() {
let mut task = restore_counter_task(ProcessingGuarantee::AtLeastOnce).await;
check!(
task.store_get_i64("counts", &"a".to_string()).await == Some(5),
"ALO restore must seed the committed changelog record"
);
check!(
task.store_get_i64("counts", &"b".to_string()).await == Some(99),
"ALO restore (READ_UNCOMMITTED) must see the uncommitted write too"
);
}
}