dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use super::*;

/// Before a [`Stream`] can be fully open, its receive callback must be set through
/// [`StreamCandidate::open`] or [`StreamCandidate::open_write_only`].
///
/// If a [`StreamCandidate`] is dropped before it is fully open, the underlying [`Stream`] will
/// be closed, notifying the remote peer.
///
/// Moreover, an [`Acceptor`] may wait for its yielded [`StreamCandidate`] to be opened before
/// notifying the connecting peer of its acceptance, preventing the opposing [`ConnectionFuture`]
/// from resolving.
///
/// Note that [`ConnectionBuilder`](ConnectionBuilder::on_receive_factory) and
/// [`AcceptorBuilder`](AcceptorBuilder::on_receive_factory), which are the only sources of
/// [`StreamCandidate`]s, both have a `on_receive_factory` method: this method allows you to set a
/// common policy for [`StreamCandidate::open`]ing the [`Stream`], which is generally more
/// convenient and may spare you some head-scratching with regard to the above note on
/// [`ConnectionFuture`]'s resolution.
#[must_use = "Until it is `open`ed, a `StreamCandidate` won't allow the connection to be used."]
pub struct StreamCandidate {
    pub(crate) inner: bus::StreamCandidate,
}
impl core::fmt::Debug for StreamCandidate {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("StreamCandidate")
            .field("peer_pubkey", &self.peer_pubkey())
            .field("topic", &self.topic())
            .finish()
    }
}
impl StreamCandidate {
    /// The public key of the peer this stream is connected to.
    pub fn peer_pubkey(&self) -> PeerPubkey {
        self.inner.inner.peer_pubkey()
    }

    /// The [`bus::Topic`] of the stream.
    pub fn topic(&self) -> bus::Topic {
        self.inner.inner.topic()
    }
    /// Opens the stream with no intent of receiving data: all received messages on the stream
    /// will be dropped immediately.
    pub fn open_write_only(mut self) -> Stream<()> {
        let stream = self
            .inner
            .inner
            .open(TaggedOption::None)
            .into_rust()
            .unwrap();
        Stream {
            stream: Arc::new(stream),
            receiver: (),
        }
    }
    /// Opens the stream, setting `handler` as its receiver.
    ///
    /// Note that once a stream is open, the callback can never be removed: it will be called
    /// until all instances descending from the returned [`Stream`] have been destroyed.
    pub fn open<OverloadId, Handler>(mut self, handler: Handler) -> Stream<Handler::Receiver>
    where
        Handler: IntoChannel<OverloadId, Inbound>,
        Handler::Sender: Channel<OverloadId, Inbound>,
    {
        let (mut tx, rx) = handler.into_channel();
        let stream = self
            .inner
            .inner
            .open(TaggedOption::Some(
                Box::new(move |payload| tx.send_infallibly_mut(Inbound(payload))).into(),
            ))
            .into_rust()
            .unwrap();
        Stream {
            stream: Arc::new(stream),

            receiver: rx,
        }
    }
}