use crate::helpers::{TENANT_A, TENANT_B};
use nodedb::control::change_stream::{ChangeEvent, ChangeOperation, ChangeStream};
use nodedb::types::{Lsn, TenantId};
#[test]
fn cdc_stream_isolated_between_tenants() {
let stream = ChangeStream::new(1024);
let _sub_b = stream.subscribe(Some("orders".into()), Some(TenantId::new(TENANT_B)));
stream.publish(ChangeEvent {
collection: "orders".into(),
document_id: "order_1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 1000,
tenant_id: TenantId::new(TENANT_A),
lsn: Lsn::new(1),
after: None,
});
stream.publish(ChangeEvent {
collection: "orders".into(),
document_id: "order_2".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 2000,
tenant_id: TenantId::new(TENANT_B),
lsn: Lsn::new(2),
after: None,
});
let all_events = stream.query_changes(Some("orders"), 0, 100);
assert!(
all_events.len() >= 2,
"Should have at least 2 events, got {}",
all_events.len()
);
let a_events: Vec<_> = all_events
.iter()
.filter(|e| e.tenant_id == TenantId::new(TENANT_A))
.collect();
assert_eq!(a_events.len(), 1);
assert_eq!(a_events[0].document_id, "order_1");
let b_events: Vec<_> = all_events
.iter()
.filter(|e| e.tenant_id == TenantId::new(TENANT_B))
.collect();
assert_eq!(b_events.len(), 1);
assert_eq!(b_events[0].document_id, "order_2");
}
#[test]
fn cdc_different_collections_isolated() {
let stream = ChangeStream::new(1024);
stream.publish(ChangeEvent {
collection: "orders".into(),
document_id: "o1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 1000,
tenant_id: TenantId::new(TENANT_A),
lsn: Lsn::new(1),
after: None,
});
stream.publish(ChangeEvent {
collection: "users".into(),
document_id: "u1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 2000,
tenant_id: TenantId::new(TENANT_A),
lsn: Lsn::new(2),
after: None,
});
let order_events = stream.query_changes(Some("orders"), 0, 100);
for event in &order_events {
assert_eq!(event.collection, "orders");
}
}