use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio::time::timeout;
use super::*;
fn setup_watch_system(
buffer_size: usize
) -> (
broadcast::Sender<WatchEvent>,
Arc<WatchRegistry>,
tokio::task::JoinHandle<()>,
) {
let (broadcast_tx, broadcast_rx) = broadcast::channel(1000);
let (unregister_tx, unregister_rx) = mpsc::unbounded_channel();
let registry = Arc::new(WatchRegistry::new(buffer_size, unregister_tx));
let dispatcher = WatchDispatcher::new(Arc::clone(®istry), broadcast_rx, unregister_rx);
let handle = tokio::spawn(async move {
dispatcher.run().await;
});
(broadcast_tx, registry, handle)
}
#[tokio::test]
async fn test_register_single_watcher() {
let (_, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("test_key");
let _handle = registry.register(key.clone());
assert_eq!(registry.watcher_count(&key), 1);
assert_eq!(registry.watched_key_count(), 1);
}
#[tokio::test]
async fn test_register_multiple_watchers_same_key() {
let (_, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("shared_key");
let _handle1 = registry.register(key.clone());
let _handle2 = registry.register(key.clone());
let _handle3 = registry.register(key.clone());
assert_eq!(registry.watcher_count(&key), 3);
assert_eq!(registry.watched_key_count(), 1); }
#[tokio::test]
async fn test_watcher_auto_cleanup_on_drop() {
let (_, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("cleanup_key");
{
let _handle = registry.register(key.clone());
assert_eq!(registry.watcher_count(&key), 1);
}
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(registry.watcher_count(&key), 0);
assert_eq!(registry.watched_key_count(), 0);
}
#[tokio::test]
async fn test_dispatcher_dispatch_to_matching_watcher() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("test_key");
let value = Bytes::from("test_value");
let mut handle = registry.register(key.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
let event = WatchEvent {
key: key.clone(),
value: value.clone(),
event_type: WatchEventType::Put as i32,
error: 0,
};
broadcast_tx.send(event).unwrap();
let received = timeout(Duration::from_millis(100), handle.receiver_mut().recv())
.await
.expect("Timeout waiting for event")
.expect("Channel closed");
assert_eq!(received.key, key);
assert_eq!(received.value, value);
assert_eq!(received.event_type, WatchEventType::Put as i32);
}
#[tokio::test]
async fn test_dispatcher_ignores_non_matching_key() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(10);
let watched_key = Bytes::from("key1");
let other_key = Bytes::from("key2");
let mut handle = registry.register(watched_key.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
let event = WatchEvent {
key: other_key,
value: Bytes::from("value"),
event_type: WatchEventType::Put as i32,
error: 0,
};
broadcast_tx.send(event).unwrap();
let result = timeout(Duration::from_millis(100), handle.receiver_mut().recv()).await;
assert!(
result.is_err(),
"Should not receive event for different key"
);
}
#[tokio::test]
async fn test_multiple_watchers_all_receive_event() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("shared_key");
let value = Bytes::from("shared_value");
let mut handle1 = registry.register(key.clone());
let mut handle2 = registry.register(key.clone());
let mut handle3 = registry.register(key.clone());
assert_eq!(registry.watcher_count(&key), 3);
tokio::time::sleep(Duration::from_millis(50)).await;
let event = WatchEvent {
key: key.clone(),
value: value.clone(),
event_type: WatchEventType::Put as i32,
error: 0,
};
broadcast_tx.send(event).unwrap();
for handle in [&mut handle1, &mut handle2, &mut handle3].iter_mut() {
let received = timeout(Duration::from_millis(100), handle.receiver_mut().recv())
.await
.expect("Timeout")
.expect("Channel closed");
assert_eq!(received.key, key);
assert_eq!(received.value, value);
}
}
#[tokio::test]
async fn test_watcher_delete_event() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("test_key");
let mut handle = registry.register(key.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
let event = WatchEvent {
key: key.clone(),
value: Bytes::new(),
event_type: WatchEventType::Delete as i32,
error: 0,
};
broadcast_tx.send(event).unwrap();
let received = timeout(Duration::from_millis(100), handle.receiver_mut().recv())
.await
.expect("Timeout")
.expect("Channel closed");
assert_eq!(received.event_type, WatchEventType::Delete as i32);
assert_eq!(received.value, Bytes::new());
}
#[tokio::test]
async fn test_multiple_events_sequential() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("test_key");
let mut handle = registry.register(key.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
broadcast_tx
.send(WatchEvent {
key: key.clone(),
value: Bytes::from("value1"),
event_type: WatchEventType::Put as i32,
error: 0,
})
.unwrap();
broadcast_tx
.send(WatchEvent {
key: key.clone(),
value: Bytes::from("value2"),
event_type: WatchEventType::Put as i32,
error: 0,
})
.unwrap();
broadcast_tx
.send(WatchEvent {
key: key.clone(),
value: Bytes::new(),
event_type: WatchEventType::Delete as i32,
error: 0,
})
.unwrap();
let event1 = handle.receiver_mut().recv().await.unwrap();
assert_eq!(event1.value, Bytes::from("value1"));
let event2 = handle.receiver_mut().recv().await.unwrap();
assert_eq!(event2.value, Bytes::from("value2"));
let event3 = handle.receiver_mut().recv().await.unwrap();
assert_eq!(event3.event_type, WatchEventType::Delete as i32);
}
#[tokio::test]
async fn test_watcher_count_after_partial_cleanup() {
let (_, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("count_key");
let handle1 = registry.register(key.clone());
let _handle2 = registry.register(key.clone());
let _handle3 = registry.register(key.clone());
assert_eq!(registry.watcher_count(&key), 3);
drop(handle1);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(registry.watcher_count(&key), 2);
}
#[tokio::test]
async fn test_different_keys_isolated() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(10);
let key1 = Bytes::from("key1");
let key2 = Bytes::from("key2");
let mut handle1 = registry.register(key1.clone());
let mut handle2 = registry.register(key2.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
broadcast_tx
.send(WatchEvent {
key: key1.clone(),
value: Bytes::from("value1"),
event_type: WatchEventType::Put as i32,
error: 0,
})
.unwrap();
let event = timeout(Duration::from_millis(100), handle1.receiver_mut().recv())
.await
.expect("Timeout")
.expect("Channel closed");
assert_eq!(event.key, key1);
let result = timeout(Duration::from_millis(50), handle2.receiver_mut().recv()).await;
assert!(result.is_err(), "handle2 should not receive event");
}
#[tokio::test]
async fn test_watcher_buffer_overflow() {
let (broadcast_tx, registry, _dispatcher_handle) = setup_watch_system(2);
let key = Bytes::from("overflow_key");
let mut handle = registry.register(key.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
for i in 0..10 {
broadcast_tx
.send(WatchEvent {
key: key.clone(),
value: Bytes::from(format!("value{i}")),
event_type: WatchEventType::Put as i32,
error: 0,
})
.unwrap();
}
tokio::time::sleep(Duration::from_millis(50)).await;
let mut received_count = 0;
while let Ok(Some(_)) = timeout(Duration::from_millis(50), handle.receiver_mut().recv()).await {
received_count += 1;
}
assert!(
received_count <= 2,
"Should receive at most buffer_size events, got {received_count}"
);
}
#[tokio::test]
async fn test_into_receiver_disables_cleanup() {
let (_, registry, _dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("test_key");
let handle = registry.register(key.clone());
assert_eq!(registry.watcher_count(&key), 1);
let (_id, _key, _receiver) = handle.into_receiver();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(registry.watcher_count(&key), 1);
}
#[tokio::test]
async fn test_concurrent_register_unregister() {
let (_broadcast_tx, registry, dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("concurrent_key");
let mut handles = vec![];
for _ in 0..10 {
let reg = Arc::clone(®istry);
let k = key.clone();
let handle = tokio::spawn(async move {
let watcher = reg.register(k);
tokio::time::sleep(Duration::from_millis(10)).await;
drop(watcher);
tokio::task::yield_now().await;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(registry.watcher_count(&key), 0);
drop(_broadcast_tx);
let _ = timeout(Duration::from_secs(1), dispatcher_handle).await;
}
#[tokio::test]
async fn test_dispatcher_shutdown_on_broadcast_close() {
let (broadcast_tx, registry, dispatcher_handle) = setup_watch_system(10);
let key = Bytes::from("test_key");
let _handle = registry.register(key.clone());
tokio::time::sleep(Duration::from_millis(50)).await;
drop(broadcast_tx);
let result = timeout(Duration::from_secs(2), dispatcher_handle).await;
assert!(
result.is_ok(),
"Dispatcher should exit within 2s on channel close"
);
}