dittolive-ditto 3.0.9

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();
use ::ffi_sdk::{self, c_cb_params, COrderByParam, LiveQueryAvailability};
pub use event::LiveQueryEvent;
pub use single_document_event::SingleDocumentLiveQueryEvent;

use crate::{
    ditto::{TryUpgrade, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
    subscription::Subscription,
};
mod event;
mod single_document_event;

pub use move_module::LiveQueryMove;
#[path = "move.rs"]
mod move_module;

trait_alias! {
    /// A closure which is called on each event for a single document live query
    /// (`find_by_id(...).observe_local(...)`)
    pub
    trait SingleDocumentEventHandler =
        FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

trait_alias! {
    /// A closure which is called on each event
    pub
    trait EventHandler =
        FnMut(Vec<BoxedDocument>, LiveQueryEvent)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

pub struct LiveQuery {
    ditto: WeakDittoHandleWrapper, // consider making Weak<BoxedDitto>
    pub query: char_p::Box,
    pub collection_name: char_p::Box,
    /// If created via `observe` contains the associated `Subscription` for the
    /// `LiveQuery`
    pub subscription: Option<Box<Subscription>>,
    pub id: i64,
}

const UNASSIGNED_ID_SENTINEL: i64 = -1;

impl LiveQuery {
    #[rustfmt::skip]
    #[allow(clippy::too_many_arguments)]
    pub
    fn with_handler<F> (
        ditto: WeakDittoHandleWrapper,
        query: char_p::Box,
        query_args: Option<Vec<u8>>,
        collection_name: char_p::Box,
        order_by: &'_ [COrderByParam<'_>], // Note COrderByParam is not opaque
        limit: i32,
        offset: u32,
        subscription: Option<Box<Subscription>>,
        event_handler: F,
    ) -> Result<Self, DittoError>
    where
        F : EventHandler,
    {
        let strong_ditto = ditto.try_upgrade()?;

        let event_handler: Arc<F> = Arc::new(event_handler);
        let ctx = Arc::into_raw(event_handler) as *mut c_void;
        let retain_ctx = {
            unsafe extern "C"
            fn retain<F>(ctx: *mut c_void) {
                Arc::<F>::increment_strong_count(ctx.cast());
            }

            retain::<F>
        };
        let release_ctx = {
            unsafe extern "C"
            fn release<F>(ctx: *mut c_void) {
                drop(Arc::<F>::from_raw(ctx.cast()));
            }

            release::<F>
        };

        #[allow(improper_ctypes_definitions)] // false positive
        unsafe extern "C"
        fn c_cb<F : EventHandler> (ctx: *mut c_void, p: c_cb_params)
        {

            let ctx: *mut F = ctx.cast();
            // Note: this assumes non-concurrent calls from the FFI, which is currently true.
            // Should it not be the case, we'd have to `Mutex`-wrap the `F` so as to `.lock()`,
            // here (or require `EventHandler : Sync + Fn`, which would make the caller be the one
            // having to do that, but it would be a breaking change).
            let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
            let event = if p.is_initial { // more explicit flag for "initial" status
                LiveQueryEvent::Initial
            } else {
                let moves: Vec<LiveQueryMove> =
                    p   .moves
                        .as_ref()
                        .unwrap()
                        .as_slice()
                        .chunks_exact(2)
                        .map(|it| if let [left, right] = *it {
                            LiveQueryMove::new(left, right)
                        } else { unreachable!() })
                        .collect()
                ;
                LiveQueryEvent::Update {
                    old_documents: p.old_documents.unwrap().into(),
                    insertions: p.insertions.unwrap().into(),
                    deletions: p.deletions.unwrap().into(),
                    updates: p.updates.unwrap().into(),
                    moves,
                }
            };
            event_handler(p.documents.into(), event);
        }

        let mut this = LiveQuery {
            ditto,
            query,
            collection_name,
            subscription,
            id: UNASSIGNED_ID_SENTINEL,
        };

        let id = &mut this.id;
        let availability = LiveQueryAvailability::Always;


        *id = unsafe {
            ffi_sdk::ditto_live_query_register_str(
                &*strong_ditto,
                this.collection_name.as_ref(),
                this.query.as_ref(),
                query_args.as_ref().map(|qa| (&qa[..]).into()),
                order_by.into(),
                limit,
                offset,
                availability,
                ctx,
                Some(retain_ctx),
                Some(release_ctx),
                c_cb::<F>,
            ).ok_or(ErrorKind::InvalidInput)?
        };

        if *id == UNASSIGNED_ID_SENTINEL {
            ::log::error!("Live query was not given a valid id");
        } else {
            ::log::debug!("Live query id: {}", id);
        }

        // start the query
        unsafe {
            ffi_sdk::ditto_live_query_start(&*strong_ditto, *id);
        }
        Ok(this)
    }
}

impl Drop for LiveQuery {
    fn drop(self: &'_ mut LiveQuery) {
        // Signals to the FFI that it is now free to *eventually* release the
        // live query's event handler.
        ::log::debug!("Dropping LiveQuery {}", &self.id);

        if let Some(ditto) = self.ditto.upgrade() {
            unsafe { ffi_sdk::ditto_live_query_stop(&*ditto, self.id) };
        }
    }
}

impl crate::observer::Observer for LiveQuery {}