dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use super::*;

/// A handle to a send operation, letting you check in on its status, or attempt to cancel it.
///
/// Keeping this handle around is entirely optional, and does come at a resource cost. You may:
/// - Drop it immediately to save on resources: this will not cancel the send operation.
/// - Keep it around in order to cancel an operation if you suspect the need for it may expire
///   before the message is [`SendStatus::Sent`].
/// - Await its [`SendHandle::changed`] transition to receive backpressure.
#[derive(Clone)]
pub struct SendHandle {
    pub(crate) handle: bus::SendHandle,
}

impl SendHandle {
    /// Attempts to cancel the send operation.
    ///
    /// Note that there generally isn't a way to guarantee that the associated payload hasn't or
    /// will not reach the remote peer.
    ///
    /// Cancelling is meant as a way to save bandwitdh if something indicates that sending the
    /// message is no longer necessary; not a way to prevent the consequences of its reception.
    ///
    /// # Errors
    /// If cancelling was reported to have failed, the last known [`SendStatus`] before the
    /// attempt will be returned.
    pub fn cancel(self) -> Result<(), SendStatus> {
        let status = self.handle.0.poll();
        match self.handle.0.cancel() {
            bus::CancellationResult::Ok => Ok(()),
            _ => Err(status),
        }
    }

    /// Returns the current status of the operation.
    pub fn current_status(&self) -> SendStatus {
        self.handle.0.poll()
    }

    /// Returns a future that will resolve upon the status changing.
    ///
    /// Note that most status changes don't actually provide you with much information, so
    /// awaiting them is not generally useful.
    ///
    /// If you wish to react to your peer receiving a given message, establishing your own
    /// in-band acknowledgement mechanism is advised.
    ///
    /// # Cancel safety
    /// The returned future is cancel-safe: dropping it will neither have side-effects, nor
    /// prevent other futures based on clones of its parent [`SendHandle`] from
    /// resolving properly. However, constructing and dropping it in a loop may lead to
    /// performance loss.
    pub fn changed(&self) -> SendHandleFuture {
        SendHandleFuture {
            inner: self.clone(),
        }
    }

    /// Adds a listener for when the operation's [`SendStatus`] changes.
    ///
    /// The previously set listeners will still be called upon status changes. You should favour
    /// this method to [`Self::set_on_change`] if you're unsure if other listeners may have been
    /// set (possibly by `await`ing a clone).
    fn add_on_change(&self, callback: impl Into<Callback<bus::SendStatus>>) {
        self.handle.0.set_on_change(callback.into(), true)
    }
}

/// Returns [`SendStatus::Sent`] once a state other than [`SendStatus::Pending`] is reached.
///
/// This future is cancel-safe: dropping it will neither have side-effects, nor prevent other
/// futures based on clones of its parent [`SendHandle`] from resolving properly.
/// However, constructing and dropping it in a loop may lead to performance loss.
pub struct SendHandleFuture {
    inner: SendHandle,
}

impl core::future::Future for SendHandleFuture {
    type Output = bus::SendStatus;
    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let state = self.inner.handle.0.poll();
        if state != SendStatus::Pending {
            core::task::Poll::Ready(state)
        } else {
            let waker = cx.waker().clone();
            self.inner
                .add_on_change(Box::new(move |_| waker.wake_by_ref()));
            core::task::Poll::Pending
        }
    }
}