dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use super::*;

/// A builder that lets you prepare a connection before [`Self::finish`]ing it to attempt to
/// open the connection.
#[must_use = "A ConnectionBuilder must be `finish`ed to actually establish a connection"]
pub struct ConnectionBuilder<'a, OnReceiveFactory = (), OnReceiveOverload = ()> {
    pub(crate) bus: &'a Bus,
    pub(crate) peer: PeerPubkey,
    pub(crate) topic: Option<bus::Topic>,
    pub(crate) reliability: bus::Reliability,
    pub(crate) on_receive: OnReceiveFactory,
    pub(crate) marker: core::marker::PhantomData<OnReceiveOverload>,
}

impl<'a> ConnectionBuilder<'a> {
    /// Allows setting [`StreamOpenOnce`] factory that will be used to handle the incoming
    /// [`StreamCandidate`] once it is constructed.
    ///
    /// You may still
    /// finish<sup>[(0)](ConnectionBuilder::finish)[(1)](ConnectionBuilder::finish_with)[(2)](ConnectionBuilder::finish_async)</
    /// sup> your [`ConnectionBuilder`] without setting such a factory. If you do so, your handler
    /// will receive a [`StreamCandidate`] instead of the already open [`Stream`] you'd be provided
    /// by if a factory has been set.
    pub fn on_receive_factory<OnReceiveOverload, OnReceiveFactory>(
        self,
        on_recv_factory: OnReceiveFactory,
    ) -> ConnectionBuilder<'a, OnReceiveFactory, OnReceiveOverload> {
        let Self {
            bus,
            peer,
            topic,
            reliability,
            on_receive: _,
            marker: _,
        } = self;
        ConnectionBuilder {
            bus,
            peer,
            topic,
            reliability,
            on_receive: on_recv_factory,
            marker: core::marker::PhantomData,
        }
    }
}

impl<'a, OnReceiveFactory: SingleCandidateHandler<OnReceiveOverload>, OnReceiveOverload>
    ConnectionBuilder<'a, OnReceiveFactory, OnReceiveOverload>
{
    /// Sets the reliability mode for the connection.
    ///
    /// The default is [`ReliabilityMode::Reliable`].
    pub fn reliability(mut self, mode: bus::Reliability) -> Self {
        self.reliability = mode;
        self
    }
    /// Attempts to open the connection, calling the `handler`'s continuation once opening has
    /// either succeeded of failed.
    ///
    /// # [`IntoChannel`]
    /// The signature of this method is built to let you pass various handlers for the events
    /// that will follow:
    /// - You may pass closures directly.
    /// - You may also pass channel tuples, such as `std::sync::mpsc::channel()`. The returned value
    ///   will wrap your channel's receiver end into the handle that keeps your binding alive.
    pub fn finish<OverloadId, Handler>(self, handler: Handler) -> Handler::Receiver
    where
        Handler: IntoChannel<OverloadId, Result<OnReceiveFactory::Output, ConnectionError>>,
        Handler::Sender:
            OneShotChannel<OverloadId, Result<OnReceiveFactory::Output, ConnectionError>>,
    {
        let Self {
            bus,
            peer,
            topic,
            reliability,
            on_receive: on_recv,
            marker: _,
        } = self;
        let (channel, receiver) = handler.into_channel();
        let Some(topic) = topic else {
            channel.send_infallibly_once(Err(ConnectionError::ConnectionRejected));
            return receiver;
        };

        let mut channel = Some((channel, on_recv));
        let closure = Box::new(move |value: bus::ConnectionResult| {
            let Some((channel, on_recv)) = channel.take() else {
                unreachable!("Connections only resolve once.")
            };
            channel.send_infallibly_once(match value {
                bus::ConnectionResult::Accepted(candidate) => {
                    Ok(on_recv.open_once(StreamCandidate { inner: candidate }))
                }
                bus::ConnectionResult::Rejected(err) => Err(err),
            })
        })
        .into();
        bus.bus
            .inner
            .connect(peer, (&*topic).into(), reliability, closure);
        receiver
    }

    /// Attempts to open the connection, calling the `on_connect` once opening has
    /// either succeeded of failed.
    ///
    /// # Relation  to [`ConnectionBuilder::finish`]
    /// This method compensates for a weakness in Rust's type inference: when you pass a closure to
    /// [`ConnectionBuilder::finish`], Rust will generally require that you explicitly type its
    /// arguments, despite being able to compute the only valid type for it's argument (as
    /// highlighted if you purposedly mis-type the argument).
    ///
    /// Hence, it essentially exists to highlight this papercut and to let you _still_ not worry
    /// about typing your closures.
    pub fn finish_with<
        F: FnOnce(Result<OnReceiveFactory::Output, ConnectionError>) + Send + Sync + 'static,
    >(
        self,
        on_connect: F,
    ) {
        self.finish(on_connect)
    }

    /// Returns a future that resolves once the connection has been established or determined to
    /// fail.
    pub fn finish_async(self) -> ConnectionFuture<OnReceiveFactory::Output>
    where
        OnReceiveFactory::Output: Send + 'static,
    {
        ConnectionFuture {
            rx: self.finish(tokio::sync::oneshot::channel()).into_future(),
        }
    }
}

/// A future that resolves once a connection is established or has failed.
///
/// # Cancel-safety
/// This future is cancel-safe, however constructing in a loop would be ill-advised.
pub struct ConnectionFuture<T> {
    rx: <tokio::sync::oneshot::Receiver<Result<T, ConnectionError>> as core::future::IntoFuture>::IntoFuture
}
impl<T> core::future::Future for ConnectionFuture<T> {
    type Output = Result<T, ConnectionError>;
    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        match core::future::Future::poll(std::pin::Pin::new(&mut self.get_mut().rx), cx) {
            std::task::Poll::Ready(r) => {
                std::task::Poll::Ready(r.unwrap_or(Err(ConnectionError::PeerNotFound)))
            }
            std::task::Poll::Pending => std::task::Poll::Pending,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn connection_future() {
        for value in [
            Ok(()),
            Err(ConnectionError::HandshakeFailed),
            Err(ConnectionError::HandshakeTimedOut),
            Err(ConnectionError::ConnectionRejected),
            Err(ConnectionError::PeerNotFound),
        ] {
            let (tx, rx) = tokio::sync::oneshot::channel();
            _ = tx.send(value);
            assert_eq!(value, ConnectionFuture { rx }.await)
        }
        let (_, rx) = tokio::sync::oneshot::channel();
        assert_eq!(
            Err::<(), _>(ConnectionError::PeerNotFound),
            ConnectionFuture { rx }.await
        );
    }
}