use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::stream::SelectAll;
use futures_util::stream::{BoxStream, once};
use futures_util::{FutureExt, Stream, StreamExt};
use tokio::sync::oneshot;
use crate::operation::Operation;
pub type SessionId = u64;
#[derive(Default)]
pub(crate) struct ExternalStream {
streams: SelectAll<BoxStream<'static, ExternalStreamEvent>>,
next_session_id: SessionId,
}
impl ExternalStream {
pub fn insert(&mut self, stream: BoxStream<'static, Operation>) -> ExternalStreamFuture {
let session_id = self.next_session_id();
let (complete_tx, complete_rx) = oneshot::channel::<()>();
let future = ExternalStreamFuture {
session_id,
complete_rx,
};
let event_stream = once(async move { ExternalStreamEvent::Start { session_id } })
.chain(stream.map(move |operation| ExternalStreamEvent::Operation {
session_id,
operation: Box::new(operation),
}))
.chain(once(async move {
let _ = complete_tx.send(());
ExternalStreamEvent::End { session_id }
}));
self.streams.push(Box::pin(event_stream));
future
}
fn next_session_id(&mut self) -> SessionId {
let next = self.next_session_id;
self.next_session_id += 1;
next
}
}
impl Stream for ExternalStream {
type Item = ExternalStreamEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.streams.poll_next_unpin(cx)
}
}
pub(crate) enum ExternalStreamEvent {
Start {
session_id: SessionId,
},
Operation {
session_id: SessionId,
operation: Box<Operation>,
},
End {
session_id: SessionId,
},
}
#[derive(Debug)]
pub struct ExternalStreamFuture {
session_id: SessionId,
complete_rx: oneshot::Receiver<()>,
}
impl ExternalStreamFuture {
pub fn session_id(&self) -> SessionId {
self.session_id
}
}
impl Future for ExternalStreamFuture {
type Output = Result<(), oneshot::error::RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.complete_rx.poll_unpin(cx)
}
}