dittolive-ditto 4.9.2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use super::*;
/// The builder for an [`Acceptor`].
#[must_use = "An AcceptorBuilder must be `finish`ed to be actually enabled."]
pub struct AcceptorBuilder<'a, Flavor, Opener = (), OpenerOverloadId = ()> {
    pub(crate) bus: &'a Bus,
    pub(crate) inner: Flavor,
    pub(crate) reliability: bus::Reliability,
    pub(crate) opener: Opener,
    pub(crate) marker: core::marker::PhantomData<OpenerOverloadId>,
}

impl<'a, Flavor> AcceptorBuilder<'a, Flavor> {
    /// Allows setting [`CandidateHandler`] factory that will be used to handle all incoming
    /// [`StreamCandidate`] when they are constructed.
    ///
    /// You may still
    /// finish<sup>[(0)](AcceptorBuilder::finish)[(1)](AcceptorBuilder::finish_with)</
    /// sup> your [`AcceptorBuilder`] without setting such a factory. If you do so, your handler
    /// will receive [`StreamCandidate`]s instead of the already open [`Stream`]s you'd be provided
    /// by if a factory has been set.
    pub fn on_receive_factory<Opener: CandidateHandler<OverloadId>, OverloadId>(
        self,
        opener: Opener,
    ) -> AcceptorBuilder<'a, Flavor, Opener, OverloadId> {
        let Self {
            bus,
            inner,
            reliability,
            ..
        } = self;
        AcceptorBuilder {
            bus,
            inner,
            reliability,
            opener,
            marker: core::marker::PhantomData,
        }
    }
}
impl<Flavor, Opener, OverloadId> AcceptorBuilder<'_, Flavor, Opener, OverloadId> {
    /// Sets the reliability mode for the acceptor: only attempts to connect with the expected
    /// reliability will succeed.
    ///
    /// The default is [`ReliabilityMode::Reliable`].
    pub fn reliability(mut self, mode: bus::Reliability) -> Self {
        self.reliability = mode;
        self
    }
}

/// The [`Topic`] [`AcceptorBuilder`] flavor allows connections on its exact topic.
pub struct ExactMatch(pub(crate) Option<bus::Topic>);

impl<Opener: CandidateHandler<OverloadId>, OverloadId>
    AcceptorBuilder<'_, ExactMatch, Opener, OverloadId>
{
    /// Actually bind the [`Topic`], calling sending newly established streams through
    /// `on_connect`.
    ///
    /// # Errors
    /// If there already exists an acceptor bound to that topic, `on_connect` will be destroyed
    /// and an error will be returned.
    ///
    /// # [`IntoChannel`]
    /// The signature of this method is built to let you pass various handlers for the events
    /// that will follow:
    /// - You may pass closures directly.
    /// - You may also pass channel tuples, such as `std::sync::mpsc::channel()`. The returned value
    ///   will wrap your channel's receiver end into the handle that keeps your binding alive.
    pub fn finish<OverloadId2, Handler>(
        self,
        on_connect: Handler,
    ) -> Result<Acceptor<Handler::Receiver>, BindError>
    where
        Handler: IntoChannel<OverloadId2, Opener::Output>,
        Handler::Sender: ConcurrentChannel<OverloadId2, Opener::Output>,
    {
        let (tx, rx) = on_connect.into_channel();
        let Self {
            bus,
            inner: ExactMatch(Some(topic)),
            opener,
            reliability,
            marker: _,
        } = self
        else {
            return Err(BindError::TopicAlreadyBound);
        };
        match bus.bus.inner.bind_topic(
            (&*topic).into(),
            reliability,
            Arc::new(move |candidate| {
                tx.send_infallibly(opener.open(StreamCandidate { inner: candidate }))
            })
            .into(),
        ) {
            bus::BindResult::Ok(acceptor) => Ok(Acceptor {
                receiver: rx,
                acceptor,
            }),
            bus::BindResult::Err(e) => Err(e),
        }
    }

    /// Actually bind the [`Topic`], calling sending newly established streams through
    /// `on_connect`.
    ///
    /// # Errors
    /// If there already exists an acceptor bound to that topic, `on_connect` will be destroyed
    /// and an error will be returned.
    ///
    /// # Relation to [`AcceptorBuilder::finish`]
    /// This method compensates for a weakness in Rust's type inference: when you pass a closure to
    /// [`AcceptorBuilder::finish`], Rust will generally require that you explicitly type its
    /// arguments, despite being able to compute the only valid type for it's argument (as
    /// highlighted if you purposedly mis-type the argument).
    ///
    /// Hence, it essentially exists to highlight this papercut and to let you _still_ not worry
    /// about typing your closures.
    pub fn finish_with<F: Fn(Opener::Output) + Send + Sync + 'static>(
        self,
        on_connect: F,
    ) -> Result<Acceptor<()>, BindError> {
        self.finish(on_connect)
    }
}

pub use bus::BindError;

/// A handle that will remove its associated acceptor from the [`Bus`] upon dropping.
///
/// Keeping the handle alive is thus key to ensuring your acceptor keeps running.
///
/// It notably [derefs](core::ops::Deref) to your handler's [`IntoChannel::Receiver`], allowing
/// you to access it through the handle.
#[must_use = "An Acceptor must be kept alive to keep the acceptor running"]
pub struct Acceptor<T> {
    receiver: T,
    acceptor: bus::Acceptor,
}
impl<T> Acceptor<T> {
    /// The topic on which this acceptor is bound.
    pub fn topic(&self) -> bus::Topic {
        self.acceptor.inner.topic()
    }
    /// The accepted reliability mode for this acceptor: only attempts to connect with the expected
    /// reliability will succeed.
    pub fn reliability(&self) -> bus::Reliability {
        self.acceptor.inner.reliability()
    }
}

impl<T> Deref for Acceptor<T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        &self.receiver
    }
}

impl<T> DerefMut for Acceptor<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.receiver
    }
}