dittolive-ditto 2.0.2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();
use ::ffi_sdk::{self, c_cb_params, COrderByParam, LiveQueryAvailability};
pub use event::LiveQueryEvent;
pub use single_document_event::SingleDocumentLiveQueryEvent;

use crate::{
    error::{DittoError, ErrorKind},
    subscription::Subscription,
};
mod event;
mod single_document_event;

pub use move_module::LiveQueryMove;
#[path = "move.rs"]
mod move_module;

trait_alias! {
    /// A closure which is called on each event for a single document live query
    /// (`find_by_id(...).observe(...)`)
    pub
    trait SingleDocumentEventHandler =
        FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

trait_alias! {
    /// A closure which is called on each event
    pub
    trait EventHandler =
        FnMut(Vec<BoxedDocument>, LiveQueryEvent)
        + Send // can be dropped in another thread
        + 'static // cannot dangle
}

pub struct LiveQuery {
    ditto: Arc<BoxedDitto>, // consider making Weak<BoxedDitto>
    pub query: char_p::Box,
    pub collection_name: char_p::Box,
    /// If created via `observe` contains the associated `Subscription` for the
    /// `LiveQuery`
    pub subscription: Option<Box<Subscription>>,
    /// An OWNED event handler, which is borrowed on the other side of the FFI.
    /// As such, `retain` and `release` methods for the handler can be None.
    pub event_handler: Pin<Box<dyn EventHandler>>,
    pub id: i64,
}

const UNASSIGNED_ID_SENTINEL: i64 = -1;

impl LiveQuery {
    #[rustfmt::skip]
    #[allow(clippy::too_many_arguments)]
    pub
    fn with_handler<F> (
        ditto: Arc<BoxedDitto>,
        query: char_p::Box,
        query_args: Option<Vec<u8>>,
        collection_name: char_p::Box,
        order_by: &'_ [COrderByParam], // Note COrderByParam is not opaque
        limit: i32,
        offset: u32,
        subscription: Option<Box<Subscription>>,
        event_handler: F,
    ) -> Result<Self, DittoError>
    where
        F : EventHandler,
    {
        let event_handler: Pin<Box<F>> = Box::pin(event_handler);
        let ctx = &*event_handler as *const F as *mut c_void;
        let mut this = LiveQuery {
            ditto,
            query,
            collection_name,
            subscription,
            event_handler,
            id: UNASSIGNED_ID_SENTINEL,
        };

        #[allow(improper_ctypes_definitions)] // false positive
        unsafe extern "C"
        fn c_cb<F : EventHandler> (ctx: *mut c_void, p: c_cb_params)
        {
            let ctx: *mut F = ctx.cast();
            let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
            let event = if p.is_initial { // more explicit flag for "initial" status
                LiveQueryEvent::Initial
            } else {
                let moves: Vec<LiveQueryMove> =
                    p   .moves
                        .as_ref()
                        .unwrap()
                        .as_slice()
                        .chunks_exact(2)
                        .map(|it| if let [left, right] = *it {
                            LiveQueryMove::new(left, right)
                        } else { unreachable!() })
                        .collect()
                ;
                LiveQueryEvent::Update{
                    old_documents: p.old_documents.unwrap().into(),
                    insertions: p.insertions.unwrap().into(),
                    deletions: p.deletions.unwrap().into(),
                    updates: p.updates.unwrap().into(),
                    moves,
                }
            };
            event_handler(p.documents.into(), event);
        }

        let id = &mut this.id;
        let availability = LiveQueryAvailability::Always;

        *id = unsafe {
            ffi_sdk::ditto_live_query_register_str(
                &*this.ditto,
                this.collection_name.as_ref(),
                this.query.as_ref(),
                query_args.as_ref().map(|qa| (&qa[..]).into()),
                order_by.into(),
                limit,
                offset,
                availability,
                ctx,
                None,
                None,
                c_cb::<F>,
            ).ok_or(ErrorKind::InvalidInput)?
        };

        if *id == UNASSIGNED_ID_SENTINEL {
            ::log::error!("Live query was not given a valid id");
        } else {
            ::log::debug!("Live query id: {}", id);
        }

        // start the query
        unsafe {
            ffi_sdk::ditto_live_query_start(&*this.ditto, *id);
        }
        Ok(this)
    }
}

impl Drop for LiveQuery {
    fn drop(self: &'_ mut LiveQuery) {
        // Signals to the FFI that it can no longer borrow the live query's
        // event handler.
        ::log::debug!("Dropping LiveQuery {}", &self.id);
        unsafe { ffi_sdk::ditto_live_query_stop(&*self.ditto, self.id) };
    }
}

impl LiveQuery {
    pub fn stop(self: LiveQuery) {
        drop(self);
    }
}