use std::time::Duration;
use crate::helpers::{TENANT_A, TENANT_B};
use nodedb::control::change_stream::{ChangeEvent, ChangeOperation, ChangeStream};
use nodedb::types::{Lsn, TenantId};
#[tokio::test]
async fn cdc_tenant_b_subscription_rejects_tenant_a_events() {
let stream = ChangeStream::new(1024);
let mut sub_b = stream.subscribe(Some("orders".into()), Some(TenantId::new(TENANT_B)));
for i in 0..5u64 {
stream.publish(ChangeEvent {
collection: "orders".into(),
document_id: format!("a_order_{i}"),
operation: ChangeOperation::Insert,
timestamp_ms: (i + 1) * 1000,
tenant_id: TenantId::new(TENANT_A),
lsn: Lsn::new(i + 1),
after: None,
});
}
stream.publish(ChangeEvent {
collection: "orders".into(),
document_id: "b_order_1".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 10_000,
tenant_id: TenantId::new(TENANT_B),
lsn: Lsn::new(100),
after: None,
});
let received = tokio::time::timeout(Duration::from_millis(500), sub_b.recv_filtered())
.await
.expect("timed out waiting for Tenant B's event")
.expect("channel error");
assert_eq!(
received.tenant_id,
TenantId::new(TENANT_B),
"First event delivered to Tenant B subscription must belong to Tenant B"
);
assert_eq!(
received.document_id, "b_order_1",
"Received wrong document_id: expected b_order_1, got {}",
received.document_id
);
let second = tokio::time::timeout(Duration::from_millis(50), sub_b.recv_filtered()).await;
assert!(
second.is_err(),
"No further events expected for Tenant B subscription; Tenant A's events must have been filtered"
);
}
#[tokio::test]
async fn cdc_unfiltered_subscription_receives_all_tenants() {
let stream = ChangeStream::new(1024);
let mut sub_all = stream.subscribe(None, None);
stream.publish(ChangeEvent {
collection: "events".into(),
document_id: "e_a".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 1000,
tenant_id: TenantId::new(TENANT_A),
lsn: Lsn::new(1),
after: None,
});
stream.publish(ChangeEvent {
collection: "events".into(),
document_id: "e_b".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 2000,
tenant_id: TenantId::new(TENANT_B),
lsn: Lsn::new(2),
after: None,
});
let ev1 = tokio::time::timeout(Duration::from_millis(200), sub_all.recv_filtered())
.await
.expect("timed out on first event")
.expect("channel error");
let ev2 = tokio::time::timeout(Duration::from_millis(200), sub_all.recv_filtered())
.await
.expect("timed out on second event")
.expect("channel error");
let tenant_ids: Vec<_> = [&ev1, &ev2].iter().map(|e| e.tenant_id).collect();
assert!(
tenant_ids.contains(&TenantId::new(TENANT_A)),
"Unfiltered subscription must receive Tenant A events"
);
assert!(
tenant_ids.contains(&TenantId::new(TENANT_B)),
"Unfiltered subscription must receive Tenant B events"
);
}
#[test]
fn cdc_query_changes_is_unfiltered_callers_must_filter() {
let stream = ChangeStream::new(1024);
stream.publish(ChangeEvent {
collection: "logs".into(),
document_id: "l_a".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 1000,
tenant_id: TenantId::new(TENANT_A),
lsn: Lsn::new(1),
after: None,
});
stream.publish(ChangeEvent {
collection: "logs".into(),
document_id: "l_b".into(),
operation: ChangeOperation::Insert,
timestamp_ms: 2000,
tenant_id: TenantId::new(TENANT_B),
lsn: Lsn::new(2),
after: None,
});
let all = stream.query_changes(Some("logs"), 0, 100);
assert!(
all.len() >= 2,
"query_changes must return all events across tenants"
);
let a_events: Vec<_> = all
.iter()
.filter(|e| e.tenant_id == TenantId::new(TENANT_A))
.collect();
let b_events: Vec<_> = all
.iter()
.filter(|e| e.tenant_id == TenantId::new(TENANT_B))
.collect();
assert_eq!(a_events.len(), 1, "should have exactly 1 Tenant A event");
assert_eq!(b_events.len(), 1, "should have exactly 1 Tenant B event");
assert_eq!(a_events[0].document_id, "l_a");
assert_eq!(b_events[0].document_id, "l_b");
}