use_prelude!();
use ::ffi_sdk::{self, c_cb_params, COrderByParam, LiveQueryAvailability};
pub use event::LiveQueryEvent;
pub use single_document_event::SingleDocumentLiveQueryEvent;
use crate::{
error::{DittoError, ErrorKind},
subscription::Subscription,
};
mod event;
mod single_document_event;
pub use move_module::LiveQueryMove;
#[path = "move.rs"]
mod move_module;
trait_alias! {
pub
trait SingleDocumentEventHandler =
FnMut(Option<BoxedDocument>, SingleDocumentLiveQueryEvent)
+ Send + 'static }
trait_alias! {
pub
trait EventHandler =
FnMut(Vec<BoxedDocument>, LiveQueryEvent)
+ Send + 'static }
pub struct LiveQuery {
ditto: Arc<BoxedDitto>, pub query: char_p::Box,
pub collection_name: char_p::Box,
pub subscription: Option<Box<Subscription>>,
pub id: i64,
}
const UNASSIGNED_ID_SENTINEL: i64 = -1;
impl LiveQuery {
#[rustfmt::skip]
#[allow(clippy::too_many_arguments)]
pub
fn with_handler<F> (
ditto: Arc<BoxedDitto>,
query: char_p::Box,
query_args: Option<Vec<u8>>,
collection_name: char_p::Box,
order_by: &'_ [COrderByParam], limit: i32,
offset: u32,
subscription: Option<Box<Subscription>>,
event_handler: F,
) -> Result<Self, DittoError>
where
F : EventHandler,
{
let event_handler: Arc<F> = Arc::new(event_handler);
let ctx = Arc::into_raw(event_handler) as *mut c_void;
let retain_ctx = {
unsafe extern "C"
fn retain<F>(ctx: *mut c_void) {
Arc::<F>::increment_strong_count(ctx.cast());
}
retain::<F>
};
let release_ctx = {
unsafe extern "C"
fn release<F>(ctx: *mut c_void) {
drop(Arc::<F>::from_raw(ctx.cast()));
}
release::<F>
};
#[allow(improper_ctypes_definitions)] unsafe extern "C"
fn c_cb<F : EventHandler> (ctx: *mut c_void, p: c_cb_params)
{
let ctx: *mut F = ctx.cast();
let event_handler: &mut F = ctx.as_mut().expect("Got NULL");
let event = if p.is_initial { LiveQueryEvent::Initial
} else {
let moves: Vec<LiveQueryMove> =
p .moves
.as_ref()
.unwrap()
.as_slice()
.chunks_exact(2)
.map(|it| if let [left, right] = *it {
LiveQueryMove::new(left, right)
} else { unreachable!() })
.collect()
;
LiveQueryEvent::Update {
old_documents: p.old_documents.unwrap().into(),
insertions: p.insertions.unwrap().into(),
deletions: p.deletions.unwrap().into(),
updates: p.updates.unwrap().into(),
moves,
}
};
event_handler(p.documents.into(), event);
}
let mut this = LiveQuery {
ditto,
query,
collection_name,
subscription,
id: UNASSIGNED_ID_SENTINEL,
};
let id = &mut this.id;
let availability = LiveQueryAvailability::Always;
*id = unsafe {
ffi_sdk::ditto_live_query_register_str(
&*this.ditto,
this.collection_name.as_ref(),
this.query.as_ref(),
query_args.as_ref().map(|qa| (&qa[..]).into()),
order_by.into(),
limit,
offset,
availability,
ctx,
Some(retain_ctx),
Some(release_ctx),
c_cb::<F>,
).ok_or(ErrorKind::InvalidInput)?
};
if *id == UNASSIGNED_ID_SENTINEL {
::log::error!("Live query was not given a valid id");
} else {
::log::debug!("Live query id: {}", id);
}
unsafe {
ffi_sdk::ditto_live_query_start(&*this.ditto, *id);
}
Ok(this)
}
}
impl Drop for LiveQuery {
fn drop(self: &'_ mut LiveQuery) {
::log::debug!("Dropping LiveQuery {}", &self.id);
unsafe { ffi_sdk::ditto_live_query_stop(&*self.ditto, self.id) };
}
}
impl crate::observer::Observer for LiveQuery {}