dittolive-ditto 5.0.0

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use std::hash::{self, Hash};

use ffi_sdk::{
    ffi_utils::repr_c, FfiQueryResult, FfiSignalNext, FfiStoreObservationHandlerWithSignalNext,
    FfiStoreObserver,
};
use uuid::Uuid;

pub use super::*;
use crate::{error::DittoError, utils::zstr::zstr};

/// Use [`ditto.store().register_observer(...)`] to create an observer with a callback.
///
/// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
pub struct StoreObserver {
    pub(crate) handle: repr_c::Box<FfiStoreObserver>,
}

trait_alias! {
    /// A change handler is called whenever an active store observer receives new results.
    pub trait ChangeHandler =
        FnMut(QueryResult)
        + Send // can be dropped in another thread
        + Sync
        + 'static // cannot dangle
}

trait_alias! {
    /// A callback used to signal that the observer is ready to handle new events.
    pub trait SignalNext =
        FnOnce()
        + 'static // cannot dangle
        + Send // can be dropped in another thread
}

trait_alias! {
    /// A change handler is called whenever an active store observer receives new results.
    ///
    /// Call the provided [`SignalNext`] function to signal that the handler is ready to
    /// receive the next callback from the store observer.
    pub trait ChangeHandlerWithSignalNext =
        FnMut(QueryResult, Box<dyn SignalNext>)
        + 'static // cannot dangle
        + Send // can be dropped in another thread
        + Sync
}

fn dittoffi_store_observer_register_safe<F>(
    ditto: &'_ Ditto,
    query: &zstr,
    query_args: Option<&[u8]>,
    mut observer: F,
) -> Result<StoreObserver>
where
    F: ChangeHandlerWithSignalNext,
{
    let ffi_callback: FfiStoreObservationHandlerWithSignalNext = {
        fn make_callback<F>(f: F) -> FfiStoreObservationHandlerWithSignalNext
        where
            F: FnMut(repr_c::Box<FfiQueryResult>, repr_c::Arc<FfiSignalNext>) + 'static + Send,
        {
            FfiStoreObservationHandlerWithSignalNext(Box::new(f).into())
        }

        make_callback(
            move |ffi_query_result: repr_c::Box<FfiQueryResult>,
                  signal_next: repr_c::Arc<FfiSignalNext>| {
                let signal_next = Box::new(move || {
                    signal_next.call();
                });

                observer(QueryResult::from(ffi_query_result), signal_next);
            },
        )
    };

    let query = query.into();
    let query_args_cbor = query_args.map(|qa| qa.into());

    let handle = ffi_sdk::dittoffi_store_register_observer_throws(
        &ditto.ditto,
        query,
        query_args_cbor,
        ffi_callback,
    )
    .into_rust_result()?;

    Ok(StoreObserver { handle })
}

impl StoreObserver {
    pub(crate) fn new<F>(
        ditto: &Ditto,
        query: &zstr,
        query_args: Option<&[u8]>,
        mut on_change: F,
    ) -> Result<Self, DittoError>
    where
        F: ChangeHandler,
    {
        Self::with_signal_next(ditto, query, query_args, move |args, signal_next| {
            on_change(args);
            signal_next();
        })
    }

    pub(crate) fn with_signal_next<F>(
        ditto: &Ditto,
        query: &zstr,
        query_args: Option<&[u8]>,
        on_change: F,
    ) -> Result<Self, DittoError>
    where
        F: ChangeHandlerWithSignalNext,
    {
        let on_change = {
            let on_change = ::std::sync::Mutex::new(on_change);
            move |arg: QueryResult, signal_next: Box<dyn SignalNext>| {
                let mut on_change = on_change
                    .lock()
                    .expect("`on_change` observer not to be poisoned");
                on_change(arg, signal_next)
            }
        };
        dittoffi_store_observer_register_safe(ditto, query, query_args, on_change)
    }

    /// Return the query string used to create this `StoreObserver`.
    pub fn query_string(&self) -> String {
        let char_p = ffi_sdk::dittoffi_store_observer_query_string(&self.handle);
        char_p.into_string()
    }

    /// Return the query arguments used to create this `StoreObserver`, if any.
    pub fn query_arguments(&self) -> Option<serde_cbor::Value> {
        let buffer: c_slice::Box<u8> =
            ffi_sdk::dittoffi_store_observer_query_arguments_cbor(&self.handle)?;
        let cbor = serde_cbor::from_slice(buffer.as_slice())
            .unwrap_or_else(|error| panic!("bug: failed to deserialize CBOR from FFI: {error}"));

        Some(cbor)
    }

    /// Cancels this [`StoreObserver`] and its callback.
    ///
    /// The callback registered with this [`StoreObserver`] will no longer be called
    /// after calling [`.cancel()`].
    ///
    /// This call is a no-op if this [`StoreObserver`] has already been cancelled
    /// or if the owning [`Ditto`] object has gone out of scope.
    ///
    /// [`.cancel()`]: Self::cancel
    pub fn cancel(&self) {
        ffi_sdk::dittoffi_store_observer_cancel(&self.handle);
    }

    /// Returns true if this [`StoreObserver`] has been cancelled.
    pub fn is_cancelled(&self) -> bool {
        ffi_sdk::dittoffi_store_observer_is_cancelled(&self.handle)
    }

    /// Returns the unique identifier for this [`StoreObserver`].
    fn id(&self) -> Uuid {
        let buffer = ffi_sdk::dittoffi_store_observer_id(&self.handle);
        Uuid::from_slice(buffer.as_slice()).expect("bug: expected valid UUID")
    }
}

impl StoreObserver {
    fn comparable_parts(&self) -> impl '_ + Eq + Hash {
        self.id()
    }
}

impl Eq for StoreObserver {}
impl PartialEq for StoreObserver {
    fn eq(&self, other: &Self) -> bool {
        self.comparable_parts() == other.comparable_parts()
    }
}

impl Hash for StoreObserver {
    fn hash<H: hash::Hasher>(&self, h: &mut H) {
        self.comparable_parts().hash(h)
    }
}