use actr_protocol::{ActorResult, ActrId, DataStream};
use dashmap::DashMap;
use futures_util::future::BoxFuture;
use std::sync::Arc;
pub(crate) type DataStreamCallback =
Arc<dyn Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync>;
pub(crate) struct DataStreamRegistry {
callbacks: DashMap<String, DataStreamCallback>,
}
impl Default for DataStreamRegistry {
fn default() -> Self {
Self::new()
}
}
impl DataStreamRegistry {
pub(crate) fn new() -> Self {
Self {
callbacks: DashMap::new(),
}
}
pub(crate) fn register(&self, stream_id: String, callback: DataStreamCallback) {
self.callbacks.insert(stream_id.clone(), callback);
tracing::info!("ðĄ Registered data stream handler: {}", stream_id);
}
pub(crate) fn unregister(&self, stream_id: &str) {
self.callbacks.remove(stream_id);
tracing::info!("ðŦ Unregistered data stream handler: {}", stream_id);
}
pub(crate) async fn dispatch(&self, chunk: DataStream, sender_id: ActrId) {
let start = std::time::Instant::now();
if let Some(callback) = self.callbacks.get(&chunk.stream_id) {
let callback = callback.clone();
tokio::spawn(async move {
if let Err(e) = callback(chunk, sender_id).await {
tracing::error!("â Stream chunk callback error: {:?}", e);
}
});
tracing::debug!("ð Dispatched data stream in {:?}", start.elapsed());
} else {
tracing::warn!("â ïļ No callback registered for stream: {}", chunk.stream_id);
}
}
}