use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use nodedb_array::sync::hlc::Hlc;
use nodedb_array::sync::op::ArrayOp;
use tracing::debug;
use crate::control::server::sync::shape::registry::ShapeRegistry;
use super::cursor;
use super::delivery::ArrayDeliveryRegistry;
use super::merge::MergerRegistry;
use super::snapshot_trigger;
use super::subscriber_state::SubscriberMap;
pub trait ArrayApplyObserver: Send + Sync {
fn on_op_applied(&self, op: &ArrayOp);
}
pub struct ArrayFanout {
shapes: Arc<ShapeRegistry>,
delivery: Arc<ArrayDeliveryRegistry>,
cursors: Arc<SubscriberMap>,
snapshot_hlcs: Arc<RwLock<HashMap<String, Hlc>>>,
mergers: Arc<MergerRegistry>,
shard_id: u16,
tenant_id: u64,
}
impl ArrayFanout {
pub fn new(
shapes: Arc<ShapeRegistry>,
delivery: Arc<ArrayDeliveryRegistry>,
cursors: Arc<SubscriberMap>,
snapshot_hlcs: Arc<RwLock<HashMap<String, Hlc>>>,
mergers: Arc<MergerRegistry>,
shard_id: u16,
tenant_id: u64,
) -> Self {
Self {
shapes,
delivery,
cursors,
snapshot_hlcs,
mergers,
shard_id,
tenant_id,
}
}
pub fn remove_session(&self, session_id: &str) {
self.cursors.remove_session(session_id);
self.delivery.unregister(session_id);
self.mergers.remove_session(session_id);
}
fn fan_out_op(&self, op: &ArrayOp) {
let coord_u64 = coord_to_u64(&op.coord);
let matches =
self.shapes
.evaluate_array_mutation(self.tenant_id, &op.header.array, &coord_u64);
if matches.is_empty() {
return;
}
for (session_id, _shape_id) in matches {
self.deliver_to_session(&session_id, op, op.header.hlc, &[]);
}
}
fn deliver_to_session(&self, session_id: &str, op: &ArrayOp, op_hlc: Hlc, _op_payload: &[u8]) {
let cursor = match self.cursors.get(session_id, &op.header.array) {
Some(c) => c,
None => {
return;
}
};
if !cursor::should_send(op_hlc, cursor.last_pushed_hlc) {
debug!(
session = %session_id,
array = %op.header.array,
op_hlc = ?op_hlc,
"array_fanout: op already delivered, skipping"
);
return;
}
let snapshot_hlc = self
.snapshot_hlcs
.read()
.ok()
.and_then(|m| m.get(&op.header.array).copied())
.unwrap_or(Hlc::ZERO);
if snapshot_trigger::check_and_trigger(
session_id,
&op.header.array,
cursor.last_pushed_hlc,
snapshot_hlc,
&self.delivery,
) {
return;
}
let merger = self.mergers.get_or_create(session_id, &op.header.array);
merger.push_op(self.shard_id, op.clone(), &self.delivery);
cursor::mark_sent(&self.cursors, session_id, &op.header.array, op_hlc);
}
}
impl ArrayApplyObserver for ArrayFanout {
fn on_op_applied(&self, op: &ArrayOp) {
self.fan_out_op(op);
}
}
fn coord_to_u64(coord: &[nodedb_array::types::coord::value::CoordValue]) -> Vec<u64> {
use nodedb_array::types::coord::value::CoordValue;
coord
.iter()
.map(|c| match c {
CoordValue::Int64(v) | CoordValue::TimestampMs(v) => *v as u64,
CoordValue::Float64(v) => v.to_bits(),
CoordValue::String(_) => u64::MAX,
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_array::sync::op::{ArrayOpHeader, ArrayOpKind};
use nodedb_array::sync::replica_id::ReplicaId;
use nodedb_array::types::coord::value::CoordValue;
use nodedb_types::sync::shape::{ArrayCoordRange, ShapeDefinition, ShapeType};
use std::sync::Arc;
use crate::control::server::sync::shape::registry::ShapeRegistry;
fn replica() -> ReplicaId {
ReplicaId::new(1)
}
fn hlc(ms: u64) -> Hlc {
Hlc::new(ms, 0, replica()).unwrap()
}
fn make_op(array: &str, ms: u64) -> ArrayOp {
ArrayOp {
header: ArrayOpHeader {
array: array.into(),
hlc: hlc(ms),
schema_hlc: hlc(1),
valid_from_ms: 0,
valid_until_ms: -1,
system_from_ms: ms as i64,
},
kind: ArrayOpKind::Put,
coord: vec![CoordValue::Int64(ms as i64)],
attrs: None,
}
}
fn make_fanout() -> (
ArrayFanout,
Arc<ShapeRegistry>,
Arc<ArrayDeliveryRegistry>,
Arc<SubscriberMap>,
) {
use super::super::merge::MergerRegistry;
use super::super::subscriber_state::SubscriberStore;
let shapes = Arc::new(ShapeRegistry::new());
let delivery = Arc::new(ArrayDeliveryRegistry::new());
let store = SubscriberStore::in_memory().unwrap();
let cursors = Arc::new(SubscriberMap::new(store));
let snapshot_hlcs = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
let mergers = Arc::new(MergerRegistry::new());
let fanout = ArrayFanout::new(
Arc::clone(&shapes),
Arc::clone(&delivery),
Arc::clone(&cursors),
snapshot_hlcs,
mergers,
0,
1,
);
(fanout, shapes, delivery, cursors)
}
#[tokio::test]
async fn op_delivered_to_matching_subscriber() {
let (fanout, shapes, delivery, cursors) = make_fanout();
cursors.register("s1", "prices", None);
let mut rx = delivery.register("s1".into());
shapes.subscribe(
"s1",
1,
ShapeDefinition {
shape_id: "sh1".into(),
tenant_id: 1,
shape_type: ShapeType::Array {
array_name: "prices".into(),
coord_range: None,
},
description: "all prices".into(),
field_filter: vec![],
},
);
let op = make_op("prices", 100);
fanout.on_op_applied(&op);
let frame = rx.try_recv().expect("frame should be delivered");
assert!(!frame.is_empty());
}
#[tokio::test]
async fn op_not_delivered_to_wrong_array() {
let (fanout, shapes, delivery, cursors) = make_fanout();
cursors.register("s1", "prices", None);
let mut rx = delivery.register("s1".into());
shapes.subscribe(
"s1",
1,
ShapeDefinition {
shape_id: "sh1".into(),
tenant_id: 1,
shape_type: ShapeType::Array {
array_name: "other".into(),
coord_range: None,
},
description: "other".into(),
field_filter: vec![],
},
);
let op = make_op("prices", 100);
fanout.on_op_applied(&op);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn op_not_delivered_when_coord_outside_range() {
let (fanout, shapes, delivery, cursors) = make_fanout();
cursors.register("s1", "mat", None);
let mut rx = delivery.register("s1".into());
shapes.subscribe(
"s1",
1,
ShapeDefinition {
shape_id: "sh1".into(),
tenant_id: 1,
shape_type: ShapeType::Array {
array_name: "mat".into(),
coord_range: Some(ArrayCoordRange {
start: vec![0],
end: Some(vec![9]),
}),
},
description: "narrow".into(),
field_filter: vec![],
},
);
let op = ArrayOp {
header: ArrayOpHeader {
array: "mat".into(),
hlc: hlc(200),
schema_hlc: hlc(1),
valid_from_ms: 0,
valid_until_ms: -1,
system_from_ms: 200,
},
kind: ArrayOpKind::Put,
coord: vec![CoordValue::Int64(50)],
attrs: None,
};
fanout.on_op_applied(&op);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn duplicate_op_not_redelivered() {
let (fanout, shapes, delivery, cursors) = make_fanout();
cursors.register("s1", "prices", None);
let mut rx = delivery.register("s1".into());
shapes.subscribe(
"s1",
1,
ShapeDefinition {
shape_id: "sh1".into(),
tenant_id: 1,
shape_type: ShapeType::Array {
array_name: "prices".into(),
coord_range: None,
},
description: "all".into(),
field_filter: vec![],
},
);
let op = make_op("prices", 100);
fanout.on_op_applied(&op);
let _first = rx.try_recv().expect("first delivery");
fanout.on_op_applied(&op);
assert!(rx.try_recv().is_err());
}
}