pub mod config;
pub mod publisher;
pub mod subscriber;
pub mod tracker;
pub use config::KvEventConsolidatorConfig;
pub use publisher::KvEventConsolidatorPublisher;
pub use tracker::{CacheStatusTracker, EventSource, StorageTier};
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use subscriber::start_simple_zmq_listener;
#[derive(Clone, Debug)]
pub struct KvEventConsolidatorHandle {
pub(crate) tracker: Arc<RwLock<CacheStatusTracker>>,
}
impl KvEventConsolidatorHandle {
#[allow(clippy::too_many_arguments)]
pub async fn handle_store(
&self,
block_hash: String,
source: EventSource,
token_ids: Vec<u32>,
parent_hash: Option<String>,
block_size: usize,
lora_name: Option<String>,
tier: Option<StorageTier>,
data_parallel_rank: Option<i32>,
) {
let mut tracker = self.tracker.write().await;
tracker.handle_store(
block_hash,
source,
token_ids,
parent_hash,
block_size,
lora_name,
tier,
data_parallel_rank,
);
}
pub async fn handle_remove(&self, block_hash: &str, source: EventSource) {
let mut tracker = self.tracker.write().await;
tracker.handle_remove(block_hash, source);
}
pub async fn handle_clear_all(&self) {
let mut tracker = self.tracker.write().await;
tracker.handle_clear_all();
}
}
pub struct KvEventConsolidator {
config: KvEventConsolidatorConfig,
tracker: Arc<RwLock<CacheStatusTracker>>,
subscriber_handle: Option<JoinHandle<()>>,
cancellation_token: CancellationToken,
publisher: Option<KvEventConsolidatorPublisher>,
}
impl KvEventConsolidator {
pub fn new(config: KvEventConsolidatorConfig) -> Result<Self> {
let tracker = Arc::new(RwLock::new(CacheStatusTracker::new()));
let cancellation_token = CancellationToken::new();
Ok(Self {
config,
tracker,
subscriber_handle: None,
cancellation_token,
publisher: None,
})
}
pub async fn start(&mut self) -> Result<()> {
tracing::info!(
"Starting KV Event Consolidator: subscribe from {}, publish to ZMQ at {}",
self.config.engine_event_endpoint,
self.config.consolidated_event_endpoint
);
let publisher = KvEventConsolidatorPublisher::new(
&self.config.consolidated_event_endpoint,
self.tracker.clone(),
)?;
self.publisher = Some(publisher);
tracing::info!("Waiting for downstream ZMQ subscribers to connect...");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let handle = start_simple_zmq_listener(
self.config.engine_event_endpoint.clone(),
self.tracker.clone(),
self.cancellation_token.clone(),
self.config.engine_source,
)
.await?;
self.subscriber_handle = Some(handle);
tracing::info!("KV Event Consolidator fully started and ready");
Ok(())
}
pub async fn shutdown(self) -> Result<()> {
tracing::info!("Shutting down KV Event Consolidator");
self.cancellation_token.cancel();
if let Some(handle) = self.subscriber_handle {
handle.abort();
let _ = handle.await;
}
if let Some(publisher) = self.publisher {
publisher.shutdown().await?;
}
Ok(())
}
pub fn tracker(&self) -> Arc<RwLock<CacheStatusTracker>> {
self.tracker.clone()
}
pub fn get_handle(&self) -> KvEventConsolidatorHandle {
KvEventConsolidatorHandle {
tracker: self.tracker.clone(),
}
}
}