use_prelude!();
use std::{
hash::{self, Hash},
sync::{Arc, Weak},
};
pub use ::ffi_sdk::{BoxedDitto, ChangeHandlerWithDocsDiff, LiveQueryAvailability};
pub use super::*;
use crate::{ditto::DittoFields, error::DittoError};
pub struct StoreObserver {
ditto: Weak<DittoFields>,
pub(crate) live_query_id: i64,
pub(crate) _query: Query,
_query_args: Option<QueryArguments>,
}
trait_alias! {
pub trait ChangeHandler =
FnMut(QueryResult)
+ Send + 'static }
impl StoreObserver {
pub(crate) fn new<F>(
ditto: &Ditto,
query: Query,
query_args: Option<QueryArguments>,
on_change: F,
) -> Result<Self, DittoError>
where
F: ChangeHandler,
{
let change_handler = Arc::new(on_change);
let ctx = Arc::into_raw(change_handler) as *mut c_void;
let retain_ctx = {
unsafe extern "C" fn retain<F>(ctx: *mut c_void) {
Arc::<F>::increment_strong_count(ctx.cast());
}
retain::<F>
};
let release_ctx = {
unsafe extern "C" fn release<F>(ctx: *mut c_void) {
drop(Arc::<F>::from_raw(ctx.cast()))
}
release::<F>
};
#[allow(improper_ctypes_definitions)] unsafe extern "C" fn c_cb<F: ChangeHandler>(
ctx: *mut c_void,
c_params: ffi_sdk::ChangeHandlerWithQueryResult,
) {
let ctx: *mut F = ctx.cast();
let change_handler: &mut F = ctx.as_mut().expect("Got NULL");
change_handler(QueryResult::from(c_params.query_result));
}
let c_query = query.prepare_ffi();
let c_query_cbor = query_args.as_ref().map(|qa| qa.cbor());
let live_query_id = unsafe {
ffi_sdk::dittoffi_try_experimental_register_change_observer_str(
&ditto.ditto,
c_query,
c_query_cbor.map(|qa| qa.into()),
LiveQueryAvailability::Always,
ctx,
Some(retain_ctx),
Some(release_ctx),
c_cb::<F>,
)
}
.into_rust_result()?;
let this = StoreObserver {
_query: query,
_query_args: query_args,
ditto: Arc::downgrade(&ditto.fields),
live_query_id,
};
ffi_sdk::ditto_live_query_start(&ditto.ditto, this.live_query_id);
Ok(this)
}
pub fn cancel(&self) {
if let Ok(ditto) = Ditto::upgrade(&self.ditto) {
ditto.store.unregister_observer(self);
}
}
pub fn is_cancelled(&self) -> bool {
Ditto::upgrade(&self.ditto).map_or(true, |ditto| ditto.store.observers().contains(self))
}
}
impl Drop for StoreObserver {
fn drop(&mut self) {
self.cancel()
}
}
impl StoreObserver {
fn comparable_parts(&self) -> impl '_ + Eq + Hash {
(self.live_query_id, &self._query, &self._query_args)
}
}
impl Eq for StoreObserver {}
impl PartialEq for StoreObserver {
fn eq(self: &Self, other: &Self) -> bool {
self.comparable_parts() == other.comparable_parts()
}
}
impl Hash for StoreObserver {
fn hash<H: hash::Hasher>(&self, h: &mut H) {
self.comparable_parts().hash(h)
}
}