dittolive-ditto 4.5.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use std::{
    hash::{self, Hash},
    sync::{Arc, Weak},
};

pub use ::ffi_sdk::{BoxedDitto, ChangeHandlerWithDocsDiff, LiveQueryAvailability};

pub use super::*;
use crate::{ditto::DittoFields, error::DittoError};

/// A change observer invokes a change handler whenever results for its query
/// change.
///
/// Create an observer by calling [`Store::register_observer`].
pub struct StoreObserver {
    /// Ditto
    ditto: Weak<DittoFields>,
    /// LiveQueryID
    pub(crate) live_query_id: i64,
    /// The DQL query this listener is reacting too.
    pub(crate) _query: Query,
    /// The argumements associated to this listener's query.
    _query_args: Option<QueryArguments>,
}

trait_alias! {
    /// A closure which is called on each event
    pub trait ChangeHandler =
        FnMut(QueryResult)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

impl StoreObserver {
    pub(crate) fn new<F>(
        ditto: &Ditto,
        query: Query,
        query_args: Option<QueryArguments>,
        on_change: F,
    ) -> Result<Self, DittoError>
    where
        F: ChangeHandler,
    {
        let change_handler = Arc::new(on_change);
        let ctx = Arc::into_raw(change_handler) as *mut c_void;

        let retain_ctx = {
            unsafe extern "C" fn retain<F>(ctx: *mut c_void) {
                Arc::<F>::increment_strong_count(ctx.cast());
            }

            retain::<F>
        };

        let release_ctx = {
            unsafe extern "C" fn release<F>(ctx: *mut c_void) {
                drop(Arc::<F>::from_raw(ctx.cast()))
            }
            release::<F>
        };

        #[allow(improper_ctypes_definitions)] // false positive
        unsafe extern "C" fn c_cb<F: ChangeHandler>(
            ctx: *mut c_void,
            c_params: ffi_sdk::ChangeHandlerWithQueryResult,
        ) {
            let ctx: *mut F = ctx.cast();
            let change_handler: &mut F = ctx.as_mut().expect("Got NULL");
            change_handler(QueryResult::from(c_params.query_result));
        }

        let c_query = query.prepare_ffi();
        let c_query_cbor = query_args.as_ref().map(|qa| qa.cbor());

        let live_query_id = unsafe {
            ffi_sdk::dittoffi_try_experimental_register_change_observer_str(
                &ditto.ditto,
                c_query,
                c_query_cbor.map(|qa| qa.into()),
                // TODO(Ronan) Add next signal
                LiveQueryAvailability::Always,
                ctx,
                Some(retain_ctx),
                Some(release_ctx),
                c_cb::<F>,
            )
        }
        .into_rust_result()?;

        let this = StoreObserver {
            _query: query,
            _query_args: query_args,
            ditto: Arc::downgrade(&ditto.fields),
            live_query_id,
        };

        ffi_sdk::ditto_live_query_start(&ditto.ditto, this.live_query_id);
        Ok(this)
    }

    /// Cancels the observation. The handler that was passed in when registering
    /// this store observer will no longer be called. No-op if this store
    /// observer is already cancelled or the owning [`Ditto`] object goes out of
    /// scope.
    pub fn cancel(&self) {
        if let Ok(ditto) = Ditto::upgrade(&self.ditto) {
            ditto.store.unregister_observer(self);
        }
    }

    pub fn is_cancelled(&self) -> bool {
        Ditto::upgrade(&self.ditto).map_or(true, |ditto| ditto.store.observers().contains(self))
    }
}

impl Drop for StoreObserver {
    fn drop(&mut self) {
        self.cancel()
    }
}

impl StoreObserver {
    fn comparable_parts(&self) -> impl '_ + Eq + Hash {
        (self.live_query_id, &self._query, &self._query_args)
    }
}

impl Eq for StoreObserver {}
impl PartialEq for StoreObserver {
    fn eq(self: &Self, other: &Self) -> bool {
        self.comparable_parts() == other.comparable_parts()
    }
}

impl Hash for StoreObserver {
    fn hash<H: hash::Hasher>(&self, h: &mut H) {
        self.comparable_parts().hash(h)
    }
}