dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use std::sync::Mutex;

use bus::StreamStatus;

use super::*;
/// A handle to a subscription to a [`Stream`]'s receiver side.
///
/// It derefs to `T`, allowing you to access the [`IntoChannel::Receiver`] end of the handler
/// you passed to [`StreamCandidate::open`] or as a [`CandidateHandler`].
#[must_use = "A StreamSubscriber must be kept alive to keep the subscription running"]
#[derive(Clone)]
pub struct Stream<Receiver> {
    pub(crate) stream: Arc<bus::Stream>,
    pub(crate) receiver: Receiver,
}
impl<R> core::ops::Deref for Stream<R> {
    type Target = R;
    fn deref(&self) -> &Self::Target {
        &self.receiver
    }
}
impl<R> core::ops::DerefMut for Stream<R> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.receiver
    }
}

impl<Receiver> Stream<Receiver> {
    /// Sends a message over the stream.
    ///
    /// Note that for the message to actually be sent, [`MessageBuilder::try_send`] must be called.
    /// [`MessageBuilder::send`] may be called instead if `Message` implements [`Into<Payload>`].
    pub fn message<Message: IntoPayload>(
        &self,
        payload: Message,
    ) -> MessageBuilder<Message, &'_ bus::Stream> {
        MessageBuilder {
            sender: &self.stream,
            message: payload,
        }
    }

    /// Returns a new handle to `self` which can only be used to send data, as its potential
    /// `Receiver` will not be accessible from the returned instance.
    ///
    /// Note that any handle to a [`Stream`] existing will keep its reception callback alive,
    /// regardless of whether or not its `Receiver` is still around. This doesn't preclude said
    /// callback's behaviour from changing upon dropping all [`Stream<Receiver>`]: all
    /// implementations of [`Channel`] by channels will drop packets once there no longer exists an
    /// associate `Receiver` to them.
    pub fn tx(&self) -> Stream<()> {
        Stream {
            stream: self.stream.clone(),
            receiver: (),
        }
    }

    /// The public key of the peer this stream is connected to.
    pub fn peer_pubkey(&self) -> PeerPubkey {
        self.stream.inner.peer_pubkey()
    }

    /// The [`bus::Topic`] of the stream.
    pub fn topic(&self) -> bus::Topic {
        self.stream.inner.topic()
    }

    /// Returns a future that will resolve once the stream is closed.
    ///
    /// Since this doesn't actually attempt to close the stream, this can be useful to detect
    /// when the opposite peer decides to close it.
    ///
    /// # Deadlock risk
    /// Awaiting the returned future before destroying `self` and every other handle on the
    /// underlying stream will block until the stream is closed by the remote peer.
    ///
    /// This method exists as a convenient way to specifically wait for the remote peer to close
    /// the stream, but you may want to race it against a timeout to avoid potential
    /// deadlocks.
    pub fn closed(&self) -> StreamClosedFuture {
        let Err((future, _)) = self.tx().close(()) else {
            unreachable!()
        };
        future
    }

    /// Returns a future that will resolve once the stream is closed, while dropping every
    /// passed end of the stream, closing it immediately if no more handles to the
    /// stream are live once this function returns.
    ///
    /// Depending on how many instances have descended from a `stream`, you may call this function
    /// with any of the following: `stream.close(())`, `stream.close(stream_clone)`,
    /// `stream.close((stream_tx1, stream_tx2))`...
    ///
    /// # Subscriber liveliness
    /// The sender end of the handler passed when constructing `self` MAY still be called with
    /// every message received onto the channel until the returned future resolves.
    ///
    ///  # Errors
    /// If there are still remaining strong stream handles in your program, this will return an
    /// error containing the future that you can still chose to await, as well as the remaining
    /// number of instances that must be destroyed before closing on your end can succeed.
    ///
    /// Awaiting the future in the error case can still make sense if you expect the stream to
    /// get closed soon either by the remote, or by handles owned by other tasks being
    /// destroyed soon.
    pub fn close<Others>(
        self,
        others: Others,
    ) -> Result<StreamClosedFuture, (StreamClosedFuture, NonZeroUsize)> {
        core::mem::drop(others);
        let remaining_handles = Arc::strong_count(&self.stream) - 1;
        let state = Arc::new(Mutex::new(StreamClosedFutureState {
            status: self.stream.inner.current_status(),
            waker: None,
        }));
        let future = StreamClosedFuture {
            inner: state.clone(),
        };
        self.stream.inner.add_on_close(
            Box::new(move |status| {
                let mut guard = state.lock().unwrap();
                guard.status = status;
                if let Some(waker) = guard.waker.take() {
                    waker.wake()
                }
            })
            .into(),
        );
        if let Some(remaining_handles) = NonZeroUsize::new(remaining_handles) {
            Err((future, remaining_handles))
        } else {
            Ok(future)
        }
    }

    /// Splits the stream from the receiver tied to its callback.
    ///
    /// Keep in mind that at least one instance of [`Stream<()>`] descending from `self` must stay
    /// alive for the callback tied to the stream to stay live.
    ///
    /// You can take advantage of that property to keep a `Receiver` around after closing the
    /// [`Stream`] that may have provided it with packets you still want to process after closing.
    ///
    /// ```
    /// # use dittolive_ditto::experimental::bus::{Inbound, StreamCandidate};
    /// # struct Processed { disconnect: bool }
    /// # fn process(payload: Inbound) -> Processed { Processed {disconnect: true } }
    /// # async fn usage(stream_candidate: StreamCandidate) {
    /// let mut stream = stream_candidate.open(tokio::sync::mpsc::unbounded_channel());
    /// while let Some(payload) = stream.recv().await {
    ///     let result = process(payload); // Process payloads while connected
    ///     if result.disconnect {
    ///         break;
    ///     }
    /// }
    /// let (_, mut rx) = stream.split(); // From that point on, `stream` is disconnected
    /// while let Some(payload) = rx.recv().await {
    ///     process(payload); // We can still process the remaining payloads
    /// }
    /// # }
    /// ```
    pub fn split(self) -> (Stream<()>, Receiver) {
        let Self { stream, receiver } = self;
        (
            Stream {
                stream,
                receiver: (),
            },
            receiver,
        )
    }

    /// Detaches the [`Stream`]'s receive callback, ensuring it keeps on running regardless of
    /// whether other instances of the same [`Stream`] still exits.
    ///
    /// This implies that the stream will only be closed if/when the remote closes the stream.
    ///
    /// Unlike [`core::mem::forget`], this will ensure the [`Stream`]'s internal allocations get
    /// cleaned up once the remote has closed the stream, preventing memory leaks.
    pub fn detach(self) -> Receiver {
        let Self { stream, receiver } = self;
        let mut this = Some(stream.clone());
        stream
            .inner
            .add_on_close(Box::new(move |_| core::mem::drop(this.take())).into());
        receiver
    }
}

impl<R> core::fmt::Debug for Stream<R> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Stream")
            .field("peer_pubkey", &self.peer_pubkey())
            .field("topic", &self.topic())
            .finish()
    }
}

/// A future that resolves once the underlying [`Stream`] is closed.
///
/// This future is cancel-safe, but:
/// - Dropping it after having polled it doesn't remove its waker from the wakelist. Therefore,
///   while the following pattern A is valid, pattern B is advised instead:
/// ```ignore
/// // Pattern A
/// tokio::select!{ _ = tx.closed() => {println!("Stream closed by remote!")}, ... };
/// // Pattern B
/// let mut closed = tx.closed();
/// tokio::select!{ _ = &mut closed => {println!("Stream closed by remote!")}, ... };
/// ```
/// - Attempting to use Pattern A with `tx.clone().close()` instead will result in your stream
///   staying alive until closed by the remote peer.
#[must_use = "Futures do nothing unless polled."]
pub struct StreamClosedFuture {
    inner: Arc<Mutex<StreamClosedFutureState>>,
}
impl core::fmt::Debug for StreamClosedFuture {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str("StreamClosedFuture")
    }
}
struct StreamClosedFutureState {
    status: StreamStatus,
    waker: Option<core::task::Waker>,
}
impl core::future::Future for StreamClosedFuture {
    type Output = StreamStatus;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut guard = self.inner.lock().unwrap();
        if guard.status != StreamStatus::Open {
            return core::task::Poll::Ready(guard.status);
        }
        guard.waker = Some(cx.waker().clone());
        core::task::Poll::Pending
    }
}