use async_trait::async_trait;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::EventSink;
use crate::Result;
use crate::events::MarketEvent;
pub struct ChannelSink {
tx: mpsc::Sender<Vec<MarketEvent>>,
name: String,
}
impl ChannelSink {
pub fn new(bound: usize) -> (Self, mpsc::Receiver<Vec<MarketEvent>>) {
let (tx, rx) = mpsc::channel(bound);
(
Self {
tx,
name: "channel".to_string(),
},
rx,
)
}
pub fn from_sender(tx: mpsc::Sender<Vec<MarketEvent>>) -> Self {
Self {
tx,
name: "channel".to_string(),
}
}
}
#[async_trait]
impl EventSink for ChannelSink {
async fn accept(&self, events: &[MarketEvent]) -> Result<()> {
self.tx
.send(events.to_vec())
.await
.map_err(|_| crate::Error::Internal(ustr::ustr("channel sink receiver dropped")))?;
Ok(())
}
async fn shutdown(&self, _token: CancellationToken) -> Result<()> {
Ok(())
}
fn name(&self) -> &str {
&self.name
}
}