dittolive-ditto 5.0.0

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use core::ffi::c_void;
use std::sync::{
    atomic::{self, AtomicU64},
    Arc, Weak,
};

use ffi_sdk::BoxedAttachmentHandle;

use crate::{
    error,
    error::DittoError,
    store::{
        attachment::{DittoAttachment, DittoAttachmentFetchEvent, DittoAttachmentToken},
        DittoFields,
    },
};

/// Type-level `enum` to distinguish between a [`DittoAttachmentFetcher`] that
/// can and must be cancelled, and the legacy behavior where it was auto-cancelled on drop.
///
/// ```rust ,ignore
/// // It represents the following, but at the type-level ("const generic" sort to speak).
/// enum FetcherVersion /* : Sealed */ {
///     /// The deprecated `.collection.fetch_attachment(...)` API.
///     V1,
///
///     /// The proper `.store.fetch_attachment(...)` API.
///     V2,
/// }
/// ```
#[allow(nonstandard_style)]
pub mod FetcherVersion {
    /// The deprecated V1 API (removed).
    pub enum V1 {}
    /// The proper [`store.fetch_attachment(...)`][crate::store::Store::fetch_attachment()] API.
    pub enum V2 {}

    mod seal {
        pub trait Sealed: 'static {}
    }
    pub(crate) use seal::Sealed;

    impl Sealed for V1 {}
    impl Sealed for V2 {}
}

/// The output of [`store.fetch_attachment()`](crate::store::Store::fetch_attachment).
///
///   - In the deprecated [`FetcherVersion::V1`] case, they must be kept alive for the fetching of
///     the attachment to proceed and for you to be notified once the status of the fetch request
///     has changed.
///
///   - In the proper [`FetcherVersion::V2`] case, they are only
///     [`.cancel()`][DittoAttachmentFetcher::cancel]-ed *explicitly*, that is, they are safe to
///     discard / not to safe-keep.
pub struct DittoAttachmentFetcher<'a, Version: FetcherVersion::Sealed = FetcherVersion::V1> {
    pub(crate) context: Arc<DittoAttachmentFetcherCtx<'a>>,
    _phantom: ::core::marker::PhantomData<fn() -> Version>,
}

impl Clone for DittoAttachmentFetcher<'static, FetcherVersion::V2> {
    fn clone(&self) -> Self {
        Self {
            context: self.context.retain(),
            ..*self
        }
    }
}

impl<'a, Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'a, Version> {
    pub(crate) fn new(
        token: DittoAttachmentToken,
        ditto: Option<&Arc<DittoFields>>,
        raw_ditto: &Arc<BoxedDitto>,
        on_fetch_event: impl 'a + Send + Sync + Fn(DittoAttachmentFetchEvent, &AtomicU64),
    ) -> Result<Self, DittoError> {
        let context = Arc::new(DittoAttachmentFetcherCtx {
            token: token.clone(),
            ditto: ditto.map_or_else(::std::sync::Weak::new, Arc::downgrade),
            raw_ditto: Arc::downgrade(raw_ditto),
            on_fetch_event: Box::new(on_fetch_event),
            cancel_token: 0.into(),
        });
        let raw_context = Arc::as_ptr(&context) as *mut c_void;

        let cancel_token = unsafe {
            ffi_sdk::ditto_resolve_attachment(
                raw_ditto,
                token.id.as_ref().into(),
                raw_context,
                Some(DittoAttachmentFetcherCtx::retain),
                Some(DittoAttachmentFetcherCtx::release),
                DittoAttachmentFetcherCtx::on_complete_cb,
                DittoAttachmentFetcherCtx::on_progress_cb,
                DittoAttachmentFetcherCtx::on_deleted_cb,
            )
            .ok()?
        };
        // HACK: until `dittoffi` exposes the `cancel_token` / `resolve_id`
        // to the callback itself, we manually back-smuggle it using the shared
        // context.
        //
        // This value is later accessed in the callback registered by
        // `Store::fetch_attachment()`, in the `on_complete` case, after having
        // acquired a lock which happens to be held during this whole `new()` call,
        // hence why `Relaxed` suffices.
        context
            .cancel_token
            .store(cancel_token as _, atomic::Ordering::Relaxed);

        Ok(Self {
            context,
            _phantom: ::core::marker::PhantomData,
        })
    }
}

impl<Version: FetcherVersion::Sealed> DittoAttachmentFetcher<'_, Version> {
    pub(crate) fn cancel_token(&self) -> u64 {
        self.context.cancel_token.load(atomic::Ordering::SeqCst)
    }
}

impl DittoAttachmentFetcher<'static, FetcherVersion::V2> {
    /// Yields `true` if the original `cancel_token` was `0`.
    pub(crate) fn cancel_token_ensure_unique(&self) -> (u64, bool) {
        let mut cancel_token = self.cancel_token();
        let was_zero = cancel_token == 0;
        if was_zero {
            // Currently, FFI `.resolve_attachment()` will yield `0` for fetchers
            // it believes to have invoked synchronously (fast path); even
            // though our callback is —outside of Wasm— dispatching these
            // invocations onto the `attachments_signal_sender` queue.
            //
            // So, to keep the property of unicity of these tokens across
            // the lifetime of a ditto instance (TODO: process-wide?), we
            // currently hack a fallback unicity mechanism. The `core`
            // auto-increments off `1..`, so let us do `(..-2).rev()`.
            static FALLBACK_UNIQUE_CANCEL_TOKEN: AtomicU64 = AtomicU64::new(u64::MAX - 1);
            cancel_token =
                FALLBACK_UNIQUE_CANCEL_TOKEN.fetch_sub(1, ::std::sync::atomic::Ordering::Relaxed);
            self.context
                .cancel_token
                .store(cancel_token, atomic::Ordering::SeqCst);
        };
        (cancel_token, was_zero)
    }
}

impl DittoAttachmentFetcher<'static, FetcherVersion::V2> {
    /// Stops fetching the fetcher's associated attachment and cleans up any
    /// associated resources.
    ///
    /// Note that you are not required to call it once your attachment
    /// fetch operation has finished. The method primarily exists to allow you
    /// to cancel an attachment fetch request while it is ongoing if you no
    /// longer wish for the attachment to be made available locally to the
    /// device nor for its evolution to be observed.
    pub fn cancel(&self) {
        if let Some(ditto) = self.context.ditto.upgrade() {
            ditto.store.unregister_fetcher(self.cancel_token(), None);
        }
    }
}

// Does nothing for V2.
impl<V1: FetcherVersion::Sealed> Drop for DittoAttachmentFetcher<'_, V1> {
    fn drop(&mut self) {
        /// Poorman's specialization.
        use core::any::TypeId;

        if TypeId::of::<V1>() == TypeId::of::<FetcherVersion::V1>() {
            if let Some(ditto) = self.context.raw_ditto.upgrade() {
                let status = ffi_sdk::ditto_cancel_resolve_attachment(
                    &ditto,
                    self.context.token.id.as_ref().into(),
                    self.cancel_token(),
                );
                if status != 0 {
                    #[allow(deprecated)] // Workaround for patched tracing
                    {
                        error!("failed to clean up attachment fetcher");
                    }
                }
            }
        }
    }
}

pub(crate) struct DittoAttachmentFetcherCtx<'a> {
    pub(crate) token: DittoAttachmentToken,
    ditto: Weak<DittoFields>,
    raw_ditto: Weak<BoxedDitto>,
    #[allow(clippy::type_complexity)]
    on_fetch_event: Box<dyn Fn(DittoAttachmentFetchEvent, &AtomicU64) + Send + Sync + 'a>,
    cancel_token: atomic::AtomicU64,
}

impl DittoAttachmentFetcherCtx<'_> {
    pub(crate) unsafe extern "C" fn retain(ctx: *mut c_void) {
        let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
        Arc::increment_strong_count(ptr);
    }

    pub(crate) unsafe extern "C" fn release(ctx: *mut c_void) {
        let ptr = ctx.cast::<DittoAttachmentFetcherCtx<'_>>();
        Arc::decrement_strong_count(ptr);
    }

    pub(crate) unsafe extern "C" fn on_complete_cb(
        ctx: *mut c_void,
        attachment_handle: BoxedAttachmentHandle,
    ) {
        let ctx_ref = ctx
            .cast::<DittoAttachmentFetcherCtx<'_>>()
            .as_ref()
            .expect("got null");

        let ditto_attachment = DittoAttachment::new_with_token(
            ctx_ref.token.clone(),
            ctx_ref.raw_ditto.clone(),
            attachment_handle,
        );
        let event = DittoAttachmentFetchEvent::Completed {
            attachment: ditto_attachment,
        };
        (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
    }

    pub(crate) unsafe extern "C" fn on_progress_cb(
        ctx: *mut c_void,
        downloaded_bytes: u64,
        total_bytes: u64,
    ) {
        let ctx_ref = ctx
            .cast::<DittoAttachmentFetcherCtx<'_>>()
            .as_ref()
            .expect("got null");

        let event = DittoAttachmentFetchEvent::Progress {
            downloaded_bytes,
            total_bytes,
        };
        (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
    }

    pub(crate) unsafe extern "C" fn on_deleted_cb(ctx: *mut c_void) {
        let ctx_ref = ctx
            .cast::<DittoAttachmentFetcherCtx<'_>>()
            .as_ref()
            .expect("got null");

        let event = DittoAttachmentFetchEvent::Deleted;
        (ctx_ref.on_fetch_event)(event, &ctx_ref.cancel_token);
    }
}