use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::SequenceHash;
pub type InstanceId = u128;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum KvCacheEvent {
Create(SequenceHash),
Remove(SequenceHash),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum KvCacheEvents {
Create(Vec<SequenceHash>),
Remove(Vec<SequenceHash>),
Shutdown,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KvbmCacheEvents {
pub events: KvCacheEvents,
pub instance_id: InstanceId,
}
pub struct EventReleaseHandle {
seq_hash: SequenceHash,
event_tx: broadcast::Sender<KvCacheEvent>,
}
impl EventReleaseHandle {
pub fn new(seq_hash: SequenceHash, event_tx: broadcast::Sender<KvCacheEvent>) -> Self {
Self { seq_hash, event_tx }
}
}
impl Drop for EventReleaseHandle {
fn drop(&mut self) {
let event = KvCacheEvent::Remove(self.seq_hash);
let _ = self.event_tx.send(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::KvbmSequenceHashProvider;
use dynamo_tokens::TokenBlockSequence;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
#[test]
fn test_event_serialization() {
let tokens = vec![1u32, 2, 3, 4];
let seq = TokenBlockSequence::from_slice(&tokens, tokens.len() as u32, Some(1337));
let seq_hash = seq.blocks()[0].kvbm_sequence_hash();
let create_event = KvCacheEvent::Create(seq_hash);
let serialized = serde_json::to_string(&create_event).unwrap();
let deserialized: KvCacheEvent = serde_json::from_str(&serialized).unwrap();
assert_eq!(create_event, deserialized);
let remove_event = KvCacheEvent::Remove(seq_hash);
let serialized = serde_json::to_string(&remove_event).unwrap();
let deserialized: KvCacheEvent = serde_json::from_str(&serialized).unwrap();
assert_eq!(remove_event, deserialized);
}
#[test]
fn test_batch_events_serialization() {
let tokens = vec![1u32, 2, 3, 4, 5, 6, 7, 8];
let seq = TokenBlockSequence::from_slice(&tokens, 4, Some(1337));
let seq_hashes: Vec<_> = seq
.blocks()
.iter()
.map(|b| b.kvbm_sequence_hash())
.collect();
let batch = KvbmCacheEvents {
events: KvCacheEvents::Create(seq_hashes.clone()),
instance_id: 12345,
};
let serialized = serde_json::to_string(&batch).unwrap();
let deserialized: KvbmCacheEvents = serde_json::from_str(&serialized).unwrap();
assert_eq!(batch, deserialized);
}
#[tokio::test]
async fn test_release_handle_drop() {
let tokens = vec![1u32, 2, 3, 4];
let seq = TokenBlockSequence::from_slice(&tokens, tokens.len() as u32, Some(1337));
let seq_hash = seq.blocks()[0].kvbm_sequence_hash();
let (tx, rx) = broadcast::channel(16);
let mut stream = BroadcastStream::new(rx);
{
let _handle = EventReleaseHandle::new(seq_hash, tx);
}
let event = stream.next().await.unwrap().unwrap();
assert_eq!(event, KvCacheEvent::Remove(seq_hash));
}
}