use std::{marker::PhantomData, path::Path};
use tokio::sync::mpsc;
use crate::*;
use super::FileObserverEvent;
pub(super) struct StreamSubscription<
B: BlockDef + Send + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> {
tx: mpsc::UnboundedSender<FileObserverEvent<B, P, Inner>>,
_phantom: PhantomData<O>,
}
impl<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> SubscriptionDef<B, BR, P, Inner, O> for StreamSubscription<B, P, Inner, O>
{
fn on_update(&mut self, total: usize, added: usize) -> SubscriptionUpdate {
let _ = self.tx.send(FileObserverEvent::Update { total, added });
SubscriptionUpdate::Read
}
fn on_packet(&mut self, packet: PacketDef<B, P, Inner>) {
let _ = self.tx.send(FileObserverEvent::Packet(packet));
}
fn on_error(&mut self, err: &Error) -> SubscriptionErrorAction {
let _ = self.tx.send(FileObserverEvent::Error(err.to_string()));
SubscriptionErrorAction::Continue
}
fn on_stopped(&mut self, reason: Option<Error>) {
let _ = self.tx.send(FileObserverEvent::Stopped(reason));
}
fn on_aborted(&mut self) {
let _ = self.tx.send(FileObserverEvent::Aborted);
}
}
pub(super) struct ObserverStreamState<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> {
observer: FileObserverDef<B, BR, P, Inner, O>,
pub(super) rx: mpsc::UnboundedReceiver<FileObserverEvent<B, P, Inner>>,
_phantom: PhantomData<BR>,
}
impl<
B: BlockDef + Send + 'static,
BR: BlockReferredDef<B> + 'static,
P: PayloadDef<Inner> + Send + 'static,
Inner: PayloadInnerDef + Send + 'static,
O: Send + Sync + 'static,
> ObserverStreamState<B, BR, P, Inner, O>
where
for<'a> Inner: PayloadSchema<Context<'a> = O>,
{
pub(super) fn new(path: impl AsRef<Path>, opt: O) -> Result<Self, Error> {
let (tx, rx) = mpsc::unbounded_channel();
let subscription = StreamSubscription::<B, P, Inner, O> {
tx,
_phantom: PhantomData,
};
let observer =
FileObserverDef::with_opt(FileObserverOptions::new(path).subscribe(subscription), opt)?;
Ok(Self {
observer,
rx,
_phantom: PhantomData,
})
}
pub(super) async fn shutdown(&mut self) {
self.observer.shutdown().await;
}
}