use async_trait::async_trait;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use super::EventSink;
use crate::Result;
use crate::events::MarketEvent;
pub struct CallbackSink<F> {
callback: Arc<F>,
name: String,
}
impl<F> CallbackSink<F> {
pub fn new(name: impl Into<String>, callback: F) -> Self {
Self {
callback: Arc::new(callback),
name: name.into(),
}
}
}
#[async_trait]
impl<F, Fut> EventSink for CallbackSink<F>
where
F: Fn(Vec<MarketEvent>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
{
async fn accept(&self, events: &[MarketEvent]) -> Result<()> {
(self.callback)(events.to_vec()).await
}
fn name(&self) -> &str {
&self.name
}
async fn shutdown(&self, _token: CancellationToken) -> Result<()> {
Ok(())
}
}
pub struct BlockingCallbackSink<F> {
callback: Arc<std::sync::Mutex<F>>,
name: String,
}
impl<F> BlockingCallbackSink<F>
where
F: FnMut(&[MarketEvent]) -> Result<()> + Send + 'static,
{
pub fn new(name: impl Into<String>, callback: F) -> Self {
Self {
callback: Arc::new(std::sync::Mutex::new(callback)),
name: name.into(),
}
}
}
#[async_trait]
impl<F> EventSink for BlockingCallbackSink<F>
where
F: FnMut(&[MarketEvent]) -> Result<()> + Send + 'static,
{
async fn accept(&self, events: &[MarketEvent]) -> Result<()> {
let owned = events.to_vec();
let cb = Arc::clone(&self.callback);
tokio::task::spawn_blocking(move || {
let mut guard = cb.lock().map_err(|e| {
crate::Error::Internal(ustr::ustr(&format!("callback lock poisoned: {e}")))
})?;
guard(&owned)
})
.await
.map_err(|e| crate::Error::Internal(ustr::ustr(&format!("callback join: {e}"))))?
}
fn name(&self) -> &str {
&self.name
}
async fn shutdown(&self, _token: CancellationToken) -> Result<()> {
Ok(())
}
}