dittolive-ditto 4.9.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use std::{
    hash::{self, Hash},
    sync::{Arc, Weak},
};

pub use ffi_sdk::LiveQueryAvailability;

pub use super::*;
use crate::{ditto::DittoFields, error::DittoError};

/// Use [`ditto.store().register_observer(...)`] to create an observer with a callback.
///
/// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
pub struct StoreObserver {
    /// Ditto
    ditto: Weak<DittoFields>,
    /// LiveQueryID
    pub(crate) live_query_id: i64,
    /// The DQL query this listener is reacting too.
    pub(crate) _query: Query,
    /// The argumements associated to this listener's query.
    _query_args: Option<QueryArguments>,
}

trait_alias! {
    /// A change handler is called whenever an active store observer receives new results.
    pub trait ChangeHandler =
        FnMut(QueryResult)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

trait_alias! {
    /// A callback used to signal that the observer is ready to handle new events.
    pub trait SignalNext =
        FnOnce()
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

trait_alias! {
    /// A change handler is called whenever an active store observer receives new results.
    ///
    /// Call the provided [`SignalNext`] function to signal that the handler is ready to
    /// receive the next callback from the store observer.
    pub trait ChangeHandlerWithSignalNext =
        FnMut(QueryResult, Box<dyn Send + SignalNext>)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

fn dittoffi_try_experimental_register_change_observer_str_safe<F>(
    ditto: &'_ Ditto,
    query: Query,
    query_args: Option<QueryArguments>,
    observer: F,
) -> Result<StoreObserver>
where
    F: 'static + Send + Sync + Fn(QueryResult, Box<dyn Send + SignalNext>),
{
    struct CbContext<F> {
        cb: F,
        ditto_weak: Weak<DittoFields>,
        live_query_id: ::std::sync::Mutex<Option<i64>>,
    }

    let cb_context = CbContext {
        cb: observer,
        ditto_weak: Arc::downgrade(&ditto.fields),
        live_query_id: ::std::sync::Mutex::new(None),
    };

    let arc_cb_context = Arc::new(cb_context);

    let retain_ctx = ::extern_c::extern_c(|ctx: *mut c_void| unsafe {
        Arc::<CbContext<F>>::increment_strong_count(ctx.cast());
    });

    let release_ctx = ::extern_c::extern_c(|ctx: *mut c_void| unsafe {
        drop(Arc::<CbContext<F>>::from_raw(ctx.cast()))
    });

    let c_cb = ::extern_c::extern_c(
        |ctx: *mut c_void, c_params: ffi_sdk::ChangeHandlerWithQueryResult| unsafe {
            let ctx: *const CbContext<F> = ctx.cast();
            let callback_context: &CbContext<F> = ctx.as_ref().expect("Got NULL");

            // Unpack resources from context
            let observer = &callback_context.cb;
            let ditto_weak = &callback_context.ditto_weak;
            let live_query_id = callback_context
                .live_query_id
                .lock()
                .expect("mutex poisoned")
                .expect("live_query_id should be set");
            let signal_next = move || {
                let Some(ditto) = Weak::upgrade(ditto_weak) else {
                    tracing::debug!("Live Query signal_next fired after Ditto has closed");
                    return;
                };

                ffi_sdk::ditto_live_query_signal_available_next(&ditto.ditto, live_query_id);
            };

            let signal_next = Box::new(signal_next);
            observer(QueryResult::from(c_params.query_result), signal_next);
        },
    );

    let c_query = query.prepare_ffi();
    let c_query_cbor = query_args.as_ref().map(|qa| qa.cbor());

    let mut live_query_id_lock = arc_cb_context.live_query_id.lock().expect("mutex poisoned");
    let live_query_id = unsafe {
        ffi_sdk::dittoffi_try_experimental_register_change_observer_str(
            &ditto.ditto,
            c_query,
            c_query_cbor.map(|qa| qa.into()),
            LiveQueryAvailability::WhenSignalled,
            Arc::as_ptr(&arc_cb_context) as *mut c_void,
            Some(retain_ctx),
            Some(release_ctx),
            c_cb,
        )
    }
    .into_rust_result()?;

    *live_query_id_lock = Some(live_query_id);
    drop(live_query_id_lock);

    let this = StoreObserver {
        _query: query,
        _query_args: query_args,
        ditto: Arc::downgrade(&ditto.fields),
        live_query_id,
    };

    ffi_sdk::ditto_live_query_start(&ditto.ditto, this.live_query_id);
    Ok(this)
}

impl StoreObserver {
    pub(crate) fn new<F>(
        ditto: &Ditto,
        query: Query,
        query_args: Option<QueryArguments>,
        mut on_change: F,
    ) -> Result<Self, DittoError>
    where
        F: ChangeHandler,
    {
        Self::with_signal_next(ditto, query, query_args, move |args, signal_next| {
            on_change(args);
            signal_next();
        })
    }

    pub(crate) fn with_signal_next<F>(
        ditto: &Ditto,
        query: Query,
        query_args: Option<QueryArguments>,
        on_change: F,
    ) -> Result<Self, DittoError>
    where
        F: ChangeHandlerWithSignalNext,
    {
        let on_change = {
            let on_change = ::std::sync::Mutex::new(on_change);
            move |arg: QueryResult, signal_next: Box<dyn Send + SignalNext>| {
                let mut on_change = on_change
                    .lock()
                    .expect("`on_change` observer not to be poisoned");
                on_change(arg, signal_next)
            }
        };
        dittoffi_try_experimental_register_change_observer_str_safe(
            ditto, query, query_args, on_change,
        )
    }

    /// Cancels the observation. The handler that was passed in when registering
    /// this store observer will no longer be called. No-op if this store
    /// observer is already cancelled or the owning [`Ditto`] object goes out of
    /// scope.

    /// Cancels this [`StoreObserver`] and its callback.
    ///
    /// The callback registered with this [`StoreObserver`] will no longer be called
    /// after calling [`.cancel()`].
    ///
    /// This call is a no-op if this [`StoreObserver`] has already been cancelled
    /// or if the owning [`Ditto`] object has gone out of scope.
    ///
    /// [`.cancel()`]: Self::cancel
    pub fn cancel(&self) {
        if let Ok(ditto) = Ditto::upgrade(&self.ditto) {
            ditto.store.unregister_observer(self);
        }
    }

    /// Returns true if this [`StoreObserver`] has been cancelled.
    pub fn is_cancelled(&self) -> bool {
        Ditto::upgrade(&self.ditto)
            .map_or(true, |ditto| ditto.store.observers().contains(self).not())
    }
}

impl Drop for StoreObserver {
    fn drop(&mut self) {
        self.cancel()
    }
}

impl StoreObserver {
    fn comparable_parts(&self) -> impl '_ + Eq + Hash {
        (self.live_query_id, &self._query, &self._query_args)
    }
}

impl Eq for StoreObserver {}
impl PartialEq for StoreObserver {
    fn eq(&self, other: &Self) -> bool {
        self.comparable_parts() == other.comparable_parts()
    }
}

impl Hash for StoreObserver {
    fn hash<H: hash::Hasher>(&self, h: &mut H) {
        self.comparable_parts().hash(h)
    }
}