Skip to main content

fast_pull/base/
pusher.rs

1use crate::ProgressEntry;
2use bytes::Bytes;
3use core::fmt::Debug;
4
5pub trait Pusher: Send + 'static {
6    type Error: Send + Unpin + 'static;
7    #[allow(clippy::missing_errors_doc)]
8    fn push(&mut self, range: &ProgressEntry, content: Bytes) -> Result<(), (Self::Error, Bytes)>;
9    #[allow(clippy::missing_errors_doc)]
10    fn flush(&mut self) -> Result<(), Self::Error> {
11        Ok(())
12    }
13}
14
15pub trait AnyError: Debug + Send + Unpin + 'static {}
16impl<T: Debug + Send + Unpin + 'static> AnyError for T {}
17
18#[allow(missing_debug_implementations)]
19pub struct BoxPusher {
20    pub pusher: Box<dyn Pusher<Error = Box<dyn AnyError>>>,
21}
22impl Pusher for BoxPusher {
23    type Error = Box<dyn AnyError>;
24    fn push(&mut self, range: &ProgressEntry, content: Bytes) -> Result<(), (Self::Error, Bytes)> {
25        self.pusher.push(range, content)
26    }
27    fn flush(&mut self) -> Result<(), Self::Error> {
28        self.pusher.flush()
29    }
30}
31
32struct PusherAdapter<P: Pusher> {
33    inner: P,
34}
35impl<P: Pusher> Pusher for PusherAdapter<P>
36where
37    P::Error: Debug,
38{
39    type Error = Box<dyn AnyError>;
40    fn push(&mut self, range: &ProgressEntry, content: Bytes) -> Result<(), (Self::Error, Bytes)> {
41        self.inner
42            .push(range, content)
43            .map_err(|(e, b)| (BoxPusher::upcast(e), b))
44    }
45    fn flush(&mut self) -> Result<(), Self::Error> {
46        self.inner.flush().map_err(|e| BoxPusher::upcast(e))
47    }
48}
49
50impl BoxPusher {
51    pub fn new<P: Pusher>(pusher: P) -> Self
52    where
53        P::Error: Debug,
54    {
55        Self {
56            pusher: Box::new(PusherAdapter { inner: pusher }),
57        }
58    }
59    pub fn upcast<E: AnyError>(e: E) -> Box<dyn AnyError> {
60        Box::new(e)
61    }
62}