use std::sync::Arc;
use super::block::registry::RegistrationHandle;
use crate::block_manager::kv_consolidator::EventSource;
use crate::block_manager::kv_consolidator::KvEventConsolidator;
pub trait EventManager: EventPublisher + EventReleaseManager + Send + Sync {
}
pub trait EventPublisher: Send + Sync {
fn publish(&self, handles: Vec<Arc<RegistrationHandle>>);
}
pub trait EventReleaseManager: Send + Sync {
fn block_release(&self, registration_handle: &RegistrationHandle);
}
pub struct PublishHandle {
handle: Arc<RegistrationHandle>,
publisher: Option<Arc<dyn EventPublisher>>,
}
impl PublishHandle {
pub fn new(handle: RegistrationHandle, publisher: Arc<dyn EventPublisher>) -> Self {
let handle = Arc::new(handle);
let publisher = Some(publisher);
Self { handle, publisher }
}
pub fn remove_handle(&self) -> Arc<RegistrationHandle> {
self.handle.clone()
}
fn disarm(&mut self) {
self.publisher = None;
}
}
impl Drop for PublishHandle {
fn drop(&mut self) {
if let Some(publisher) = self.publisher.take() {
publisher.publish(vec![self.handle.clone()]);
}
}
}
#[derive(Clone)]
pub struct Publisher {
handles: Vec<Arc<RegistrationHandle>>,
publisher: Arc<dyn EventPublisher>,
}
impl Publisher {
pub fn new(publisher: Arc<dyn EventPublisher>) -> Self {
Self {
handles: Vec::new(),
publisher,
}
}
pub fn take_handle(&mut self, publish_handle: PublishHandle) -> Arc<RegistrationHandle> {
let handle = publish_handle.remove_handle();
self.handles.push(handle.clone());
let mut publish_handle = publish_handle;
publish_handle.disarm();
handle
}
pub fn publish(&mut self) {
let handles = std::mem::take(&mut self.handles);
if !handles.is_empty() {
self.publisher.publish(handles);
}
}
}
impl Drop for Publisher {
fn drop(&mut self) {
self.publish();
}
}
pub struct NullEventManager;
impl NullEventManager {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}
impl EventManager for NullEventManager {}
impl EventPublisher for NullEventManager {
fn publish(&self, _handles: Vec<Arc<RegistrationHandle>>) {}
}
impl EventReleaseManager for NullEventManager {
fn block_release(&self, _registration_handle: &RegistrationHandle) {}
}
pub struct DynamoEventManager {
consolidator_handle: Arc<crate::block_manager::kv_consolidator::KvEventConsolidatorHandle>,
#[allow(dead_code)]
_consolidator: Option<Arc<crate::block_manager::kv_consolidator::KvEventConsolidator>>,
}
impl DynamoEventManager {
pub fn new(
consolidator_handle: Arc<crate::block_manager::kv_consolidator::KvEventConsolidatorHandle>,
) -> Arc<Self> {
Arc::new(Self {
consolidator_handle,
_consolidator: None,
})
}
pub async fn new_with_config(
config: crate::block_manager::kv_consolidator::KvEventConsolidatorConfig,
) -> anyhow::Result<Arc<Self>> {
let mut kv_event_consolidator = KvEventConsolidator::new(config)?;
kv_event_consolidator.start().await?;
let handle = kv_event_consolidator.get_handle();
Ok(Arc::new(Self {
consolidator_handle: Arc::new(handle),
_consolidator: Some(Arc::new(kv_event_consolidator)),
}))
}
fn publish_store_events(&self, handles: Vec<Arc<RegistrationHandle>>) {
if handles.is_empty() {
return;
}
tracing::debug!(
"DynamoEventManager::publish_store_events called with {} blocks",
handles.len()
);
let kv_event_consolidator = self.consolidator_handle.clone();
if let Ok(rt) = tokio::runtime::Handle::try_current() {
rt.spawn(async move {
for handle in handles {
let block_hash = handle.sequence_hash().to_string();
let parent_hash = handle.parent_sequence_hash().map(|h| h.to_string());
let block_size = handle.block_size(); let tokens: Vec<u32> = handle.tokens().iter().copied().collect();
tracing::debug!(
"DynamoEventManager sending store event to kv event consolidator: block_hash={}, block_size={}, tokens={}",
block_hash,
block_size,
tokens.len()
);
kv_event_consolidator
.handle_store(
block_hash,
EventSource::Kvbm,
tokens,
parent_hash,
block_size,
None, None, None, )
.await;
}
});
} else {
tracing::error!(
"No Tokio runtime in context; dropping store events for {} blocks",
handles.len()
);
}
}
fn publish_remove_event(&self, registration_handle: &RegistrationHandle) {
let block_hash = registration_handle.sequence_hash().to_string();
tracing::debug!(
"DynamoEventManager::publish_remove_event called: block_hash={}",
block_hash
);
let kv_event_consolidator = self.consolidator_handle.clone();
if let Ok(rt) = tokio::runtime::Handle::try_current() {
rt.spawn(async move {
kv_event_consolidator
.handle_remove(&block_hash, EventSource::Kvbm)
.await;
});
} else {
tracing::error!(
"No Tokio runtime in context; dropping remove event for block {}",
block_hash
);
}
}
}
impl std::fmt::Debug for DynamoEventManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DynamoEventManager(kv_event_consolidator)")
}
}
impl EventManager for DynamoEventManager {}
impl EventPublisher for DynamoEventManager {
fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
self.publish_store_events(handles);
}
}
impl EventReleaseManager for DynamoEventManager {
fn block_release(&self, registration_handle: &RegistrationHandle) {
self.publish_remove_event(registration_handle);
}
}
#[cfg(test)]
pub mod tests {
use crate::tokens::SequenceHash;
use super::*;
#[derive(Debug, PartialEq, Eq)]
pub enum EventType {
Register(SequenceHash),
Remove(SequenceHash),
}
pub struct MockEventManager {
tx: tokio::sync::mpsc::UnboundedSender<Vec<EventType>>,
}
impl MockEventManager {
pub fn new() -> (
Arc<Self>,
tokio::sync::mpsc::UnboundedReceiver<Vec<EventType>>,
) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(Arc::new(Self { tx }), rx)
}
pub fn publisher(self: &Arc<Self>) -> Publisher {
Publisher::new(self.clone())
}
}
impl EventManager for MockEventManager {}
impl EventPublisher for MockEventManager {
fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
let events = handles
.iter()
.map(|handle| EventType::Register(handle.sequence_hash()))
.collect::<Vec<_>>();
self.tx.send(events).unwrap();
}
}
impl EventReleaseManager for MockEventManager {
fn block_release(&self, registration_handle: &RegistrationHandle) {
let events = vec![EventType::Remove(registration_handle.sequence_hash())];
self.tx.send(events).unwrap();
}
}
}