use std::sync::Arc;
use anyhow::Result;
use derive_builder::Builder;
use futures::Stream;
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use super::policy::EventEmissionPolicy;
use super::protocol::{EventReleaseHandle, KvCacheEvent};
use crate::registry::BlockRegistrationHandle;
#[derive(Builder, Clone)]
#[builder(setter(into, strip_option), build_fn(error = "anyhow::Error"))]
pub struct EventsManagerSettings {
#[builder(default, setter(strip_option = false))]
policy: Option<Arc<dyn EventEmissionPolicy>>,
#[builder(default = "1024")]
channel_capacity: usize,
}
impl EventsManagerSettings {
pub fn builder() -> EventsManagerSettingsBuilder {
EventsManagerSettingsBuilder::default()
}
pub fn into_manager(self) -> EventsManager {
let policy = self
.policy
.unwrap_or_else(|| Arc::new(super::policy::AllEventsPolicy::new()));
let (event_tx, _) = broadcast::channel(self.channel_capacity);
EventsManager { policy, event_tx }
}
}
pub struct EventsManager {
policy: Arc<dyn EventEmissionPolicy>,
event_tx: broadcast::Sender<KvCacheEvent>,
}
#[derive(Default)]
pub struct EventsManagerBuilder(EventsManagerSettingsBuilder);
impl EventsManagerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn policy(mut self, policy: Arc<dyn EventEmissionPolicy>) -> Self {
self.0.policy = Some(Some(policy));
self
}
pub fn channel_capacity(mut self, capacity: usize) -> Self {
self.0.channel_capacity = Some(capacity);
self
}
pub fn build(self) -> EventsManager {
self.0
.build()
.expect("EventsManagerSettings has all defaults")
.into_manager()
}
}
impl EventsManager {
pub fn builder() -> EventsManagerBuilder {
EventsManagerBuilder::new()
}
pub fn subscribe(&self) -> impl Stream<Item = KvCacheEvent> + Send + 'static {
let rx = self.event_tx.subscribe();
BroadcastStream::new(rx).filter_map(|result| result.ok())
}
pub fn on_block_registered(&self, handle: &BlockRegistrationHandle) -> Result<()> {
let seq_hash = handle.seq_hash();
if !self.policy.should_emit(seq_hash) {
return Ok(());
}
let create_event = KvCacheEvent::Create(seq_hash);
let _ = self.event_tx.send(create_event);
let release_handle = EventReleaseHandle::new(seq_hash, self.event_tx.clone());
handle.attach_unique(Arc::new(release_handle))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::policy::PowerOfTwoPolicy;
use super::*;
use crate::registry::BlockRegistry;
use crate::{KvbmSequenceHashProvider, SequenceHash};
use dynamo_tokens::TokenBlockSequence;
use futures::StreamExt;
fn create_seq_hash_at_position(position: usize) -> SequenceHash {
let tokens_per_block = 4;
let total_tokens = (position + 1) * tokens_per_block;
let tokens: Vec<u32> = (0..total_tokens as u32).collect();
let seq = TokenBlockSequence::from_slice(&tokens, tokens_per_block as u32, Some(1337));
seq.blocks()[position].kvbm_sequence_hash()
}
#[tokio::test]
async fn test_events_manager_emits_create_for_power_of_two() {
let manager = EventsManager::builder()
.policy(Arc::new(PowerOfTwoPolicy::new()))
.build();
let mut stream = Box::pin(manager.subscribe());
let registry = BlockRegistry::new();
let seq_hash = create_seq_hash_at_position(16); let handle = registry.register_sequence_hash(seq_hash);
manager.on_block_registered(&handle).unwrap();
let event = tokio::time::timeout(std::time::Duration::from_millis(100), stream.next())
.await
.unwrap()
.unwrap();
assert_eq!(event, KvCacheEvent::Create(seq_hash));
}
#[tokio::test]
async fn test_events_manager_skips_non_power_of_two() {
let manager = EventsManager::builder()
.policy(Arc::new(PowerOfTwoPolicy::new()))
.build();
let mut stream = Box::pin(manager.subscribe());
let registry = BlockRegistry::new();
let seq_hash = create_seq_hash_at_position(17); let handle = registry.register_sequence_hash(seq_hash);
manager.on_block_registered(&handle).unwrap();
let result =
tokio::time::timeout(std::time::Duration::from_millis(50), stream.next()).await;
assert!(result.is_err());
drop(handle);
}
#[tokio::test]
async fn test_events_manager_emits_remove_on_drop() {
let manager = EventsManager::builder()
.policy(Arc::new(PowerOfTwoPolicy::new()))
.build();
let mut stream = Box::pin(manager.subscribe());
let registry = BlockRegistry::new();
let seq_hash = create_seq_hash_at_position(32);
{
let handle = registry.register_sequence_hash(seq_hash);
manager.on_block_registered(&handle).unwrap();
let event = tokio::time::timeout(std::time::Duration::from_millis(100), stream.next())
.await
.unwrap()
.unwrap();
assert_eq!(event, KvCacheEvent::Create(seq_hash));
}
let event = tokio::time::timeout(std::time::Duration::from_millis(100), stream.next())
.await
.unwrap()
.unwrap();
assert_eq!(event, KvCacheEvent::Remove(seq_hash));
}
#[tokio::test]
async fn test_events_manager_multiple_subscribers() {
let manager = EventsManager::builder()
.policy(Arc::new(PowerOfTwoPolicy::new()))
.build();
let mut stream1 = Box::pin(manager.subscribe());
let mut stream2 = Box::pin(manager.subscribe());
let registry = BlockRegistry::new();
let seq_hash = create_seq_hash_at_position(64); let handle = registry.register_sequence_hash(seq_hash);
manager.on_block_registered(&handle).unwrap();
let event1 = tokio::time::timeout(std::time::Duration::from_millis(100), stream1.next())
.await
.unwrap()
.unwrap();
let event2 = tokio::time::timeout(std::time::Duration::from_millis(100), stream2.next())
.await
.unwrap()
.unwrap();
assert_eq!(event1, KvCacheEvent::Create(seq_hash));
assert_eq!(event2, KvCacheEvent::Create(seq_hash));
}
#[test]
fn test_events_manager_default_policy() {
let manager = EventsManager::builder().build();
let registry = BlockRegistry::new();
let seq_hash = create_seq_hash_at_position(17);
let _subscription = manager.subscribe();
let handle = registry.register_sequence_hash(seq_hash);
manager.on_block_registered(&handle).unwrap();
}
}