mod channel;
use std::{
pin::Pin,
task::{Context, Poll},
};
use crate::*;
use self::channel::ObserverStreamState;
pub enum FileObserverEvent<
B: BlockDef + Send + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
> {
Update { total: usize, added: usize },
Packet(PacketDef<B, P, Inner>),
Error(String),
Stopped(Option<Error>),
Aborted,
}
pub struct FileObserverStreamDef<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> {
inner: ObserverStreamState<B, BR, P, Inner, O>,
}
impl<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> Unpin for FileObserverStreamDef<B, BR, P, Inner, O>
where
for<'a> Inner: ProtocolSchema<Context<'a> = O>,
{
}
impl<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> FileObserverStreamDef<B, BR, P, Inner, O>
where
for<'a> Inner: ProtocolSchema<Context<'a> = O>,
{
pub fn with_opt(path: impl AsRef<std::path::Path>, opt: O) -> Result<Self, Error> {
Ok(Self {
inner: ObserverStreamState::new(path, opt)?,
})
}
pub fn new(path: impl AsRef<std::path::Path>) -> Result<Self, Error>
where
O: Default,
{
Self::with_opt(path, O::default())
}
pub async fn shutdown(&mut self) {
self.inner.shutdown().await;
}
}
impl<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> tokio_stream::Stream for FileObserverStreamDef<B, BR, P, Inner, O>
where
for<'a> Inner: ProtocolSchema<Context<'a> = O>,
{
type Item = FileObserverEvent<B, P, Inner>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().inner.rx).poll_recv(cx)
}
}