#[cfg(any(feature = "tokio-runtime", test))]
use std::collections::VecDeque;
#[cfg(any(feature = "tokio-runtime", test))]
use crate::models::BatchStatus;
#[cfg(any(feature = "tokio-runtime", feature = "wasm", test))]
use crate::{models::ChangeEvent, seq_tracking, SeqId};
#[cfg(any(feature = "tokio-runtime", test))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct EventProgress {
pub(crate) seq_id: SeqId,
pub(crate) advance_resume: bool,
}
#[cfg(any(feature = "tokio-runtime", test))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BatchEnvelope {
pub(crate) status: BatchStatus,
pub(crate) has_more: bool,
pub(crate) last_seq_id: Option<SeqId>,
}
#[cfg(any(feature = "tokio-runtime", test))]
pub(crate) fn batch_envelope(event: &ChangeEvent) -> Option<BatchEnvelope> {
match event {
ChangeEvent::Ack { batch_control, .. }
| ChangeEvent::InitialDataBatch { batch_control, .. } => Some(BatchEnvelope {
status: batch_control.status,
has_more: batch_control.has_more,
last_seq_id: batch_control.last_seq_id,
}),
_ => None,
}
}
#[cfg(any(feature = "tokio-runtime", test))]
pub(crate) fn subscription_start_ready(event: &ChangeEvent) -> bool {
batch_envelope(event).is_some_and(|batch| batch.status == BatchStatus::Ready)
}
#[cfg(any(feature = "tokio-runtime", test))]
pub(crate) fn event_progress(event: &ChangeEvent) -> Option<EventProgress> {
match event {
ChangeEvent::InitialDataBatch { rows, .. } => {
let batch = batch_envelope(event);
let seq_id = seq_tracking::extract_max_seq(rows)
.or_else(|| batch.and_then(|batch| batch.last_seq_id))?;
Some(EventProgress {
seq_id,
advance_resume: false,
})
},
ChangeEvent::Insert { rows, .. } | ChangeEvent::Update { rows, .. } => {
seq_tracking::extract_max_seq(rows).map(|seq_id| EventProgress {
seq_id,
advance_resume: true,
})
},
ChangeEvent::Delete { old_rows, .. } => {
seq_tracking::extract_max_seq(old_rows).map(|seq_id| EventProgress {
seq_id,
advance_resume: true,
})
},
_ => None,
}
}
#[cfg(any(feature = "tokio-runtime", test))]
pub(crate) fn final_resume_seq(
requested_from: Option<SeqId>,
consumed_seq_id: Option<SeqId>,
) -> Option<SeqId> {
match (requested_from, consumed_seq_id) {
(Some(requested), Some(consumed)) => Some(requested.max(consumed)),
(requested, consumed) => requested.or(consumed),
}
}
#[cfg(any(feature = "tokio-runtime", feature = "wasm", test))]
pub(crate) fn filter_replayed_event(
event: ChangeEvent,
resume_from: Option<SeqId>,
) -> Option<ChangeEvent> {
let Some(from) = resume_from else {
return Some(event);
};
match event {
ChangeEvent::InitialDataBatch {
subscription_id,
mut rows,
batch_control,
} => {
let removed = seq_tracking::retain_rows_after(&mut rows, from);
if removed > 0 {
log::debug!(
"[kalam-sdk] [{}] Filtered {} stale initial row(s) at from={}",
subscription_id,
removed,
from
);
}
Some(ChangeEvent::InitialDataBatch {
subscription_id,
rows,
batch_control,
})
},
ChangeEvent::Insert {
subscription_id,
mut rows,
} => {
let removed = seq_tracking::retain_rows_after(&mut rows, from);
if removed > 0 {
log::debug!(
"[kalam-sdk] [{}] Filtered {} stale insert row(s) at from={}",
subscription_id,
removed,
from
);
}
if rows.is_empty() {
None
} else {
Some(ChangeEvent::Insert {
subscription_id,
rows,
})
}
},
ChangeEvent::Update {
subscription_id,
mut rows,
mut old_rows,
} => {
let removed_new = seq_tracking::retain_rows_after(&mut rows, from);
let removed_old = seq_tracking::retain_rows_after(&mut old_rows, from);
let removed = removed_new.max(removed_old);
if removed > 0 {
log::debug!(
"[kalam-sdk] [{}] Filtered {} stale update row(s) at from={}",
subscription_id,
removed,
from
);
}
if rows.is_empty() && old_rows.is_empty() {
None
} else {
Some(ChangeEvent::Update {
subscription_id,
rows,
old_rows,
})
}
},
ChangeEvent::Delete {
subscription_id,
mut old_rows,
} => {
let removed = seq_tracking::retain_rows_after(&mut old_rows, from);
if removed > 0 {
log::debug!(
"[kalam-sdk] [{}] Filtered {} stale delete row(s) at from={}",
subscription_id,
removed,
from
);
}
if old_rows.is_empty() {
None
} else {
Some(ChangeEvent::Delete {
subscription_id,
old_rows,
})
}
},
_ => Some(event),
}
}
#[cfg(any(feature = "tokio-runtime", test))]
pub(crate) fn buffer_event(
event_queue: &mut VecDeque<ChangeEvent>,
buffered_changes: &mut Vec<ChangeEvent>,
is_loading: &mut bool,
resume_from: Option<SeqId>,
event: ChangeEvent,
) {
let Some(event) = filter_replayed_event(event, resume_from) else {
return;
};
if let Some(batch) = batch_envelope(&event) {
*is_loading = batch.status != BatchStatus::Ready;
event_queue.push_back(event);
if !*is_loading {
for buffered in buffered_changes.drain(..) {
event_queue.push_back(buffered);
}
}
return;
}
match event {
ChangeEvent::Insert { .. } | ChangeEvent::Update { .. } | ChangeEvent::Delete { .. } => {
if *is_loading {
buffered_changes.push(event);
} else {
event_queue.push_back(event);
}
},
_ => event_queue.push_back(event),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{BatchControl, KalamCellValue, RowData};
fn batch_control(status: BatchStatus) -> BatchControl {
BatchControl {
batch_num: 1,
has_more: false,
status,
last_seq_id: None,
}
}
fn row(id: &str, seq: i64) -> RowData {
let mut row = RowData::new();
row.insert("id".to_string(), KalamCellValue::text(id));
row.insert("_seq".to_string(), KalamCellValue::text(seq.to_string()));
row
}
#[test]
fn filters_resumed_delete_with_empty_result() {
let event = ChangeEvent::Delete {
subscription_id: "sub-1".to_string(),
old_rows: vec![row("1", 10)],
};
assert!(filter_replayed_event(event, Some(SeqId::from_i64(10))).is_none());
}
#[test]
fn progress_marks_ready_initial_batch_without_advancing_resume() {
let event = ChangeEvent::InitialDataBatch {
subscription_id: "sub-1".to_string(),
rows: vec![row("1", 11)],
batch_control: batch_control(BatchStatus::Ready),
};
assert_eq!(
event_progress(&event),
Some(EventProgress {
seq_id: SeqId::from_i64(11),
advance_resume: false,
})
);
}
#[test]
fn progress_marks_loading_initial_batch_without_advancing_resume() {
let mut control = batch_control(BatchStatus::LoadingBatch);
control.last_seq_id = Some(SeqId::from_i64(21));
let event = ChangeEvent::InitialDataBatch {
subscription_id: "sub-1".to_string(),
rows: Vec::new(),
batch_control: control,
};
assert_eq!(
event_progress(&event),
Some(EventProgress {
seq_id: SeqId::from_i64(21),
advance_resume: false,
})
);
}
#[test]
fn buffering_flushes_live_changes_after_ready_snapshot() {
let mut event_queue = VecDeque::new();
let mut buffered = Vec::new();
let mut is_loading = true;
buffer_event(
&mut event_queue,
&mut buffered,
&mut is_loading,
None,
ChangeEvent::Insert {
subscription_id: "sub-1".to_string(),
rows: vec![row("live", 12)],
},
);
assert!(event_queue.is_empty());
assert_eq!(buffered.len(), 1);
buffer_event(
&mut event_queue,
&mut buffered,
&mut is_loading,
None,
ChangeEvent::InitialDataBatch {
subscription_id: "sub-1".to_string(),
rows: vec![row("snap", 11)],
batch_control: batch_control(BatchStatus::Ready),
},
);
assert_eq!(event_queue.len(), 2);
assert!(buffered.is_empty());
assert!(!is_loading);
}
}