use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::Stream;
use crate::channels::{QueryResult, ResultDiff};
pub const DEFAULT_OUTBOX_CAPACITY: usize = 1000;
#[derive(Debug, Clone)]
pub struct QueryOutputState {
results: im::HashMap<u64, serde_json::Value>,
as_of_sequence: u64,
outbox: VecDeque<Arc<QueryResult>>,
outbox_capacity: usize,
}
impl QueryOutputState {
const MAX_OUTBOX_CAPACITY: usize = 1_000_000;
pub fn new(outbox_capacity: usize) -> Self {
let effective_capacity = outbox_capacity.clamp(1, Self::MAX_OUTBOX_CAPACITY);
Self {
results: im::HashMap::new(),
as_of_sequence: 0,
outbox: VecDeque::with_capacity(effective_capacity.min(1024)),
outbox_capacity: effective_capacity,
}
}
pub fn apply_diffs(&mut self, diffs: &[ResultDiff]) {
for diff in diffs {
match diff {
ResultDiff::Add {
data,
row_signature,
} => {
self.results.insert(*row_signature, data.clone());
}
ResultDiff::Delete { row_signature, .. } => {
self.results.remove(row_signature);
}
ResultDiff::Update {
after,
row_signature,
..
} => {
self.results.insert(*row_signature, after.clone());
}
ResultDiff::Aggregation {
after,
row_signature,
..
} => {
self.results.insert(*row_signature, after.clone());
}
ResultDiff::Noop => {}
}
}
}
pub fn advance_sequence_and_push(&mut self, mut result: QueryResult) -> Arc<QueryResult> {
self.as_of_sequence = self.as_of_sequence.saturating_add(1);
result.sequence = self.as_of_sequence;
let arc_result = Arc::new(result);
if self.outbox.len() >= self.outbox_capacity {
self.outbox.pop_front();
}
self.outbox.push_back(arc_result.clone());
arc_result
}
pub fn get_results_as_vec(&self) -> Vec<serde_json::Value> {
self.results.values().cloned().collect()
}
pub fn outbox_capacity(&self) -> usize {
self.outbox_capacity
}
pub fn as_of_sequence(&self) -> u64 {
self.as_of_sequence
}
pub fn outbox_len(&self) -> usize {
self.outbox.len()
}
pub fn results_len(&self) -> usize {
self.results.len()
}
pub fn get_result(&self, row_signature: &u64) -> Option<&serde_json::Value> {
self.results.get(row_signature)
}
pub fn clone_results(&self) -> im::HashMap<u64, serde_json::Value> {
self.results.clone()
}
pub fn fetch_outbox_after(
&self,
after_sequence: u64,
) -> Result<Vec<Arc<QueryResult>>, OutboxGap> {
if after_sequence >= self.as_of_sequence {
return Ok(Vec::new());
}
let earliest = self
.outbox
.front()
.map(|r| r.sequence)
.unwrap_or(self.as_of_sequence + 1);
if after_sequence + 1 < earliest {
return Err(OutboxGap {
requested: after_sequence,
earliest_available: earliest,
latest_sequence: self.as_of_sequence,
config_hash: 0, });
}
let entries: Vec<Arc<QueryResult>> = self
.outbox
.iter()
.filter(|r| r.sequence > after_sequence)
.cloned()
.collect();
Ok(entries)
}
}
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
#[error("Outbox gap: requested after seq {requested}, but earliest available is {earliest_available} (latest: {latest_sequence})")]
pub struct OutboxGap {
pub requested: u64,
pub earliest_available: u64,
pub latest_sequence: u64,
pub config_hash: u64,
}
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
pub enum FetchError {
#[error("Query is not running (status: {status:?})")]
NotRunning {
status: crate::channels::ComponentStatus,
},
#[error("Timed out waiting for query to finish bootstrapping")]
TimedOut,
#[error(transparent)]
OutboxGap(#[from] OutboxGap),
}
#[derive(Debug, Clone)]
pub struct SnapshotResponse {
results: im::HashMap<u64, serde_json::Value>,
pub as_of_sequence: u64,
pub config_hash: u64,
}
impl SnapshotResponse {
pub fn new(
results: im::HashMap<u64, serde_json::Value>,
as_of_sequence: u64,
config_hash: u64,
) -> Self {
Self {
results,
as_of_sequence,
config_hash,
}
}
pub fn stream(self) -> impl Stream<Item = serde_json::Value> + Send {
tokio_stream::iter(self.results.into_iter().map(|(_, v)| v))
}
pub fn to_vec(&self) -> Vec<serde_json::Value> {
self.results.values().cloned().collect()
}
pub fn len(&self) -> usize {
self.results.len()
}
pub fn is_empty(&self) -> bool {
self.results.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct OutboxResponse {
pub results: Vec<Arc<QueryResult>>,
pub latest_sequence: u64,
pub config_hash: u64,
}
pub struct SnapshotStream {
inner: Pin<Box<dyn Stream<Item = serde_json::Value> + Send>>,
pub as_of_sequence: u64,
pub config_hash: u64,
}
impl SnapshotStream {
pub fn from_snapshot(snapshot: SnapshotResponse) -> Self {
let as_of_sequence = snapshot.as_of_sequence;
let config_hash = snapshot.config_hash;
Self {
inner: Box::pin(snapshot.stream()),
as_of_sequence,
config_hash,
}
}
pub fn from_stream(
stream: impl Stream<Item = serde_json::Value> + Send + 'static,
as_of_sequence: u64,
config_hash: u64,
) -> Self {
Self {
inner: Box::pin(stream),
as_of_sequence,
config_hash,
}
}
pub async fn collect_vec(self) -> Vec<serde_json::Value> {
use tokio_stream::StreamExt;
self.inner.collect().await
}
}
impl Stream for SnapshotStream {
type Item = serde_json::Value;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
pub struct OutboxStream {
inner: Pin<Box<dyn Stream<Item = Arc<QueryResult>> + Send>>,
pub latest_sequence: u64,
pub config_hash: u64,
}
impl OutboxStream {
pub fn from_outbox(outbox: OutboxResponse) -> Self {
let latest_sequence = outbox.latest_sequence;
let config_hash = outbox.config_hash;
Self {
inner: Box::pin(tokio_stream::iter(outbox.results)),
latest_sequence,
config_hash,
}
}
pub fn from_stream(
stream: impl Stream<Item = Arc<QueryResult>> + Send + 'static,
latest_sequence: u64,
config_hash: u64,
) -> Self {
Self {
inner: Box::pin(stream),
latest_sequence,
config_hash,
}
}
pub async fn collect_vec(self) -> Vec<Arc<QueryResult>> {
use tokio_stream::StreamExt;
self.inner.collect().await
}
}
impl Stream for OutboxStream {
type Item = Arc<QueryResult>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn make_query_result(query_id: &str, diffs: Vec<ResultDiff>) -> QueryResult {
QueryResult::new(
query_id.to_string(),
0, chrono::Utc::now(),
diffs,
HashMap::new(),
)
}
#[test]
fn test_apply_diffs_add() {
let mut state = QueryOutputState::new(10);
let diffs = vec![ResultDiff::Add {
data: serde_json::json!({"name": "Alice"}),
row_signature: 100,
}];
state.apply_diffs(&diffs);
assert_eq!(state.results.len(), 1);
assert_eq!(
state.results.get(&100),
Some(&serde_json::json!({"name": "Alice"}))
);
}
#[test]
fn test_apply_diffs_delete() {
let mut state = QueryOutputState::new(10);
state
.results
.insert(100, serde_json::json!({"name": "Alice"}));
let diffs = vec![ResultDiff::Delete {
data: serde_json::json!({"name": "Alice"}),
row_signature: 100,
}];
state.apply_diffs(&diffs);
assert_eq!(state.results.len(), 0);
}
#[test]
fn test_apply_diffs_update() {
let mut state = QueryOutputState::new(10);
state
.results
.insert(100, serde_json::json!({"name": "Alice"}));
let diffs = vec![ResultDiff::Update {
data: serde_json::json!({"name": "Bob"}),
before: serde_json::json!({"name": "Alice"}),
after: serde_json::json!({"name": "Bob"}),
grouping_keys: None,
row_signature: 100,
}];
state.apply_diffs(&diffs);
assert_eq!(state.results.len(), 1);
assert_eq!(
state.results.get(&100),
Some(&serde_json::json!({"name": "Bob"}))
);
}
#[test]
fn test_apply_diffs_aggregation() {
let mut state = QueryOutputState::new(10);
let diffs = vec![ResultDiff::Aggregation {
before: None,
after: serde_json::json!({"count": 5}),
row_signature: 200,
}];
state.apply_diffs(&diffs);
assert_eq!(state.results.len(), 1);
assert_eq!(
state.results.get(&200),
Some(&serde_json::json!({"count": 5}))
);
let diffs = vec![ResultDiff::Aggregation {
before: Some(serde_json::json!({"count": 5})),
after: serde_json::json!({"count": 10}),
row_signature: 200,
}];
state.apply_diffs(&diffs);
assert_eq!(state.results.len(), 1);
assert_eq!(
state.results.get(&200),
Some(&serde_json::json!({"count": 10}))
);
}
#[test]
fn test_apply_diffs_noop() {
let mut state = QueryOutputState::new(10);
state
.results
.insert(100, serde_json::json!({"name": "Alice"}));
let diffs = vec![ResultDiff::Noop];
state.apply_diffs(&diffs);
assert_eq!(state.results.len(), 1);
}
#[test]
fn test_advance_sequence_and_push() {
let mut state = QueryOutputState::new(3);
let result = make_query_result("q1", vec![]);
let arc = state.advance_sequence_and_push(result);
assert_eq!(arc.sequence, 1);
assert_eq!(state.as_of_sequence, 1);
assert_eq!(state.outbox.len(), 1);
assert_eq!(state.outbox.back().unwrap().sequence, 1);
let result = make_query_result("q1", vec![]);
let arc = state.advance_sequence_and_push(result);
assert_eq!(arc.sequence, 2);
assert_eq!(state.outbox.len(), 2);
}
#[test]
fn test_outbox_capacity_eviction() {
let mut state = QueryOutputState::new(3);
for _ in 0..5 {
let result = make_query_result("q1", vec![]);
state.advance_sequence_and_push(result);
}
assert_eq!(state.outbox.len(), 3);
assert_eq!(state.as_of_sequence, 5);
assert_eq!(state.outbox.front().unwrap().sequence, 3);
assert_eq!(state.outbox.back().unwrap().sequence, 5);
}
#[test]
fn test_fetch_outbox_after_caught_up() {
let mut state = QueryOutputState::new(10);
let result = make_query_result("q1", vec![]);
state.advance_sequence_and_push(result);
let entries = state.fetch_outbox_after(1).unwrap();
assert!(entries.is_empty());
let entries = state.fetch_outbox_after(100).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_fetch_outbox_after_returns_entries() {
let mut state = QueryOutputState::new(10);
for _ in 0..5 {
let result = make_query_result("q1", vec![]);
state.advance_sequence_and_push(result);
}
let entries = state.fetch_outbox_after(2).unwrap();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 3);
assert_eq!(entries[1].sequence, 4);
assert_eq!(entries[2].sequence, 5);
}
#[test]
fn test_fetch_outbox_after_gap_error() {
let mut state = QueryOutputState::new(3);
for _ in 0..5 {
let result = make_query_result("q1", vec![]);
state.advance_sequence_and_push(result);
}
let err = state.fetch_outbox_after(0).unwrap_err();
assert_eq!(err.requested, 0);
assert_eq!(err.earliest_available, 3);
assert_eq!(err.latest_sequence, 5);
assert_eq!(err.config_hash, 0); }
#[test]
fn test_get_results_as_vec() {
let mut state = QueryOutputState::new(10);
state.results.insert(1, serde_json::json!({"a": 1}));
state.results.insert(2, serde_json::json!({"b": 2}));
let vec = state.get_results_as_vec();
assert_eq!(vec.len(), 2);
assert!(vec.contains(&serde_json::json!({"a": 1})));
assert!(vec.contains(&serde_json::json!({"b": 2})));
}
#[test]
fn test_snapshot_clone_is_independent() {
let mut state = QueryOutputState::new(10);
state
.results
.insert(1, serde_json::json!({"name": "Alice"}));
let snapshot = state.results.clone();
state.results.insert(1, serde_json::json!({"name": "Bob"}));
assert_eq!(
snapshot.get(&1),
Some(&serde_json::json!({"name": "Alice"}))
);
assert_eq!(
state.results.get(&1),
Some(&serde_json::json!({"name": "Bob"}))
);
}
#[test]
fn test_outbox_capacity_zero_clamped_to_one() {
let mut state = QueryOutputState::new(0);
assert_eq!(state.outbox_capacity, 1);
let result = make_query_result("q1", vec![]);
state.advance_sequence_and_push(result);
assert_eq!(state.outbox.len(), 1);
let result = make_query_result("q1", vec![]);
state.advance_sequence_and_push(result);
assert_eq!(state.outbox.len(), 1);
assert_eq!(state.outbox.front().unwrap().sequence, 2);
}
#[tokio::test]
async fn snapshot_stream_yields_all_values() {
use tokio_stream::StreamExt;
let mut map = im::HashMap::new();
map.insert(1, serde_json::json!({"id": 1}));
map.insert(2, serde_json::json!({"id": 2}));
map.insert(3, serde_json::json!({"id": 3}));
let snap = SnapshotResponse::new(map, 10, 42);
assert_eq!(snap.len(), 3);
assert!(!snap.is_empty());
let mut collected: Vec<serde_json::Value> = snap.stream().collect().await;
collected.sort_by_key(|v| v["id"].as_u64().unwrap());
assert_eq!(collected.len(), 3);
assert_eq!(collected[0]["id"], 1);
assert_eq!(collected[1]["id"], 2);
assert_eq!(collected[2]["id"], 3);
}
}