dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
//! Use [`ditto.sync()`] to access the [`Sync`] API of Ditto.
//!
//! The [`Sync`] API can be used to request documents to be synchronized with
//! other peers. Use [`ditto.sync().register_subscription(...)`] and provide
//! a DQL query to let Ditto know which documents you're interested in syncing.
//!
//! The returned [`SyncSubscription`] handle can be used to cancel the
//! subscription with [`.cancel()`], at which point Ditto will stop syncing
//! data for that subscription.
//!
//! # Example
//!
//! ```
//! use dittolive_ditto::prelude::*;
//! # fn example(ditto: &Ditto) -> anyhow::Result<()> {
//!
//! let sync_subscription = ditto
//!     .sync()
//!     .register_subscription("SELECT * FROM cars WHERE color = 'blue'", None)?;
//!
//! // To cancel the sync subscription, use .cancel()
//! sync_subscription.cancel();
//! # Ok(())
//! # }
//! ```
//!
//! [`ditto.sync()`]: Ditto::sync
//! [`ditto.sync().register_subscription(...)`]: Sync::register_subscription
//! [`.cancel()`]: SyncSubscription::cancel

use std::{
    ops::Deref,
    sync::{Arc, RwLock, Weak},
};

use tracing::{debug, error};

use crate::{
    ditto::{Ditto, DittoFields},
    dql::*,
    error::DittoError,
    utils::{extension_traits::FfiResultIntoRustResult, SetArc},
};

mod sync_subscription;
pub use self::sync_subscription::SyncSubscription;

/// Ditto's `Sync` API, obtained via [`ditto.sync()`].
///
/// [See the `sync` module documentation for more details][0].
///
/// [`ditto.sync()`]: crate::prelude::Ditto::sync
/// [0]: crate::sync
pub struct Sync {
    ditto: Weak<DittoFields>,
    subscriptions: RwLock<SetArc<SyncSubscription>>,
}

impl Sync {
    pub(crate) fn new(ditto: Weak<DittoFields>) -> Self {
        Self {
            ditto,
            subscriptions: RwLock::new(SetArc::default()),
        }
    }

    /// Gets temporary access to the set of currently registered subscriptions.
    ///
    /// A (read) lock is held until the return value is dropped: this means
    /// that neither [`Self::register_subscription()`] nor
    /// [`SyncSubscription::cancel()`] can make progress until this read
    /// lock is released.
    pub fn subscriptions(&self) -> impl '_ + Deref<Target = SetArc<SyncSubscription>> {
        self.subscriptions.read().unwrap()
    }

    /// Use a DQL query to subscribe to data on other Ditto peers.
    ///
    /// While the subscription is active, data matching this query on other
    /// peers will be synced to the local peer's data store.
    ///
    /// Note that dropping the `SyncSubscription` won't cancel it, to do that
    /// be sure to use [`sync_subscription.cancel()`].
    ///
    /// # Example
    ///
    /// ```
    /// use dittolive_ditto::prelude::*;
    /// # fn example(ditto: &Ditto) -> anyhow::Result<()> {
    /// let sync_subscription = ditto.sync().register_subscription(
    ///     "SELECT * FROM cars WHERE color = :color",
    ///     Some(serde_json::json!({
    ///         "color": "blue"
    ///     }).into()),
    /// )?;
    ///
    /// // Cancel the subscription with `.cancel()`
    /// sync_subscription.cancel();
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`sync_subscription.cancel()`]: crate::sync::SyncSubscription::cancel
    pub fn register_subscription<Q>(
        &self,
        query: Q,
        query_args: Option<QueryArguments>,
    ) -> Result<Arc<SyncSubscription>, DittoError>
    where
        Q: TryInto<Query, Error = DittoError>,
    {
        let ditto = Ditto::upgrade(&self.ditto)?;
        let new_sub = Arc::new(SyncSubscription::new(
            &ditto,
            query.try_into()?,
            query_args,
        )?);
        self.subscriptions.write().unwrap().insert(new_sub.clone());
        Ok(new_sub)
    }

    pub(crate) fn unregister_subscription(&self, subscription: &SyncSubscription) -> bool {
        let subscriptions = &mut *self.subscriptions.write().unwrap();
        let removed = subscriptions.remove(subscription);
        if removed {
            debug!(%subscription, "unregistering sync subscription");
            if let Ok(ditto) = Ditto::upgrade(&self.ditto) {
                if let Err(error) = ffi_sdk::dittoffi_try_remove_sync_subscription(
                    &ditto.ditto,
                    subscription.query.prepare_ffi(),
                    subscription.query_args.as_ref().map(|qa| qa.cbor().into()),
                )
                .into_rust_result()
                {
                    let ditto_error = DittoError::from(error);
                    error!(
                        error = %ditto_error,
                        %subscription,
                        "failed to unregister sync subscription",
                    );
                }
                return true;
            }
        }
        false
    }
}