use futures::Stream;
use intrepid_core::Frame;
use tower::Service;
use crate::EventRepo;
use super::Subscription;
pub struct SubscriptionWorker {
completions: tokio::sync::mpsc::Receiver<Frame>,
waker: tokio::sync::mpsc::Sender<()>,
}
impl SubscriptionWorker {
pub fn new<Repo, FrameService>(subscriber: Subscription<Repo>, service: FrameService) -> Self
where
Repo: EventRepo + Clone + Send + Sync + 'static,
FrameService: Service<Frame> + Send + Sync + 'static,
FrameService::Future: Send,
<FrameService as Service<Frame>>::Response: Into<Frame>,
<FrameService as Service<Frame>>::Error: Into<Frame>,
{
let (waker, mut wake) = tokio::sync::mpsc::channel(1);
let (work_complete, completions) = tokio::sync::mpsc::channel(16);
tokio::spawn(async move {
let mut service = service;
while let Some(()) = wake.recv().await {
if let Ok(events) = subscriber.poll_events().await {
for event in events {
let frame = match service.call(event.into()).await {
Ok(frame) => frame.into(),
Err(error) => error.into(),
};
if let Err(error) = work_complete.send(frame).await {
tracing::error!("Failed to send frame: {:?}", error);
break;
}
}
}
}
});
let this = Self { waker, completions };
this.wake();
this
}
pub fn wake(&self) {
let outbox = self.waker.clone();
tokio::spawn(async move {
let _ = outbox.send(()).await;
});
}
}
impl Stream for SubscriptionWorker {
type Item = Frame;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = self.completions.poll_recv(cx);
if poll.is_pending() {
self.wake();
}
poll
}
}