use futures::Stream;
use intrepid_core::Frame;
use crate::EventRepo;
use super::Subscription;
pub struct SubscriptionStream {
pub inbox: tokio::sync::mpsc::Receiver<Frame>,
pub outbox: tokio::sync::mpsc::Sender<()>,
}
impl SubscriptionStream {
pub fn new<Repo>(subscriber: Subscription<Repo>) -> Self
where
Repo: EventRepo + Clone + Send + Sync + 'static,
{
let (waker_outbox, mut waker_inbox) = tokio::sync::mpsc::channel(1);
let (frame_outbox, frame_inbox) = tokio::sync::mpsc::channel(16);
tokio::spawn(async move {
while let Some(()) = waker_inbox.recv().await {
if let Ok(events) = subscriber.poll_events().await {
for event in events {
let _ = frame_outbox.send(event.into()).await;
}
}
}
});
let this = Self {
inbox: frame_inbox,
outbox: waker_outbox,
};
this.trigger_next_poll();
this
}
pub fn trigger_next_poll(&self) {
let outbox = self.outbox.clone();
tokio::spawn(async move {
let _ = outbox.send(()).await;
});
}
}
impl Stream for SubscriptionStream {
type Item = Frame;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = self.inbox.poll_recv(cx);
if poll.is_pending() {
self.trigger_next_poll();
}
poll
}
}