use_prelude!();
use std::{
ops::Deref,
sync::{
atomic::{self, AtomicU64},
RwLock, Weak,
},
};
use ffi_sdk::FfiStoreObserver;
use crate::{debug, error};
pub mod attachment;
mod document_id;
mod observer;
pub mod transactions;
use self::attachment::{DittoAttachmentFetcher, FetcherVersion};
pub use self::{
document_id::DocumentId,
observer::{ChangeHandler, ChangeHandlerWithSignalNext, SignalNext, StoreObserver},
};
use crate::{
ditto::DittoFields,
dql::{query::IntoQuery, *},
error::{DittoError, ErrorKind},
utils::{extension_traits::FfiResultIntoRustResult, SetArc},
};
type CancelToken = u64;
#[derive(Clone)]
pub struct Store {
ditto: Arc<ffi_sdk::BoxedDitto>,
weak_ditto_fields: Weak<DittoFields>,
#[allow(clippy::type_complexity)]
attachment_fetchers: Arc<
RwLock<HashMap<CancelToken, (bool, DittoAttachmentFetcher<'static, FetcherVersion::V2>)>>,
>,
}
impl Store {
pub(crate) fn new(
ditto: Arc<ffi_sdk::BoxedDitto>,
weak_ditto_fields: Weak<DittoFields>,
) -> Self {
Self {
ditto,
weak_ditto_fields,
attachment_fetchers: <_>::default(),
}
}
pub fn register_observer<Q, F>(
&self,
query: Q,
on_change: F,
) -> Result<Arc<StoreObserver>, DittoError>
where
Q: IntoQuery,
Q::Args: Serialize,
F: ChangeHandler,
{
let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
let query = query.into_query()?;
let observer = Arc::new(StoreObserver::new(
&ditto,
&query.string,
query.args_cbor.as_deref(),
on_change,
)?);
Ok(observer)
}
pub fn register_observer_with_signal_next<Q, F>(
&self,
query: Q,
on_change: F,
) -> Result<Arc<StoreObserver>, DittoError>
where
Q: IntoQuery,
Q::Args: Serialize,
F: ChangeHandlerWithSignalNext,
{
let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
let query = query.into_query()?;
let new_obs = Arc::new(StoreObserver::with_signal_next(
&ditto,
&query.string,
query.args_cbor.as_deref(),
on_change,
)?);
Ok(new_obs)
}
pub fn observers(&self) -> impl '_ + Deref<Target = SetArc<StoreObserver>> {
let observers: repr_c::Vec<repr_c::Box<FfiStoreObserver>> =
ffi_sdk::dittoffi_store_observers(&self.ditto);
let observers: Vec<_> = observers.into();
let observers = observers
.into_iter()
.map(|handle: repr_c::Box<FfiStoreObserver>| Arc::new(StoreObserver { handle }))
.collect::<SetArc<_>>();
Box::new(observers)
}
pub async fn execute<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
where
Q: IntoQuery,
Q::Args: serde::Serialize,
{
let query = query.into_query()?;
let query_string = (&*query.string).into();
let query_args = query.args_cbor.as_deref().map(Into::into);
let ffi_query_result =
ffi_sdk::dittoffi_try_exec_statement(&self.ditto, query_string, query_args)
.into_rust_result()?;
Ok(QueryResult::from(ffi_query_result))
}
pub async fn new_attachment(
&self,
filepath: &(impl ?Sized + AsRef<Path>),
user_data: HashMap<String, String>,
) -> Result<DittoAttachment, DittoError> {
DittoAttachment::from_file_and_metadata(filepath, user_data, &self.ditto)
}
pub async fn new_attachment_from_bytes(
&self,
bytes: &(impl ?Sized + AsRef<[u8]>),
user_data: HashMap<String, String>,
) -> Result<DittoAttachment, DittoError> {
DittoAttachment::from_bytes_and_metadata(bytes, user_data, &self.ditto)
}
pub fn fetch_attachment(
&self,
attachment_token: impl attachment::DittoAttachmentTokenLike,
on_fetch_event: impl 'static + Send + Sync + Fn(DittoAttachmentFetchEvent),
) -> Result<DittoAttachmentFetcher<'static, FetcherVersion::V2>, DittoError> {
let attachment_token = attachment_token.parse_attachment_token()?;
let weak_ditto = self.weak_ditto_fields.clone();
let ditto = weak_ditto
.upgrade()
.ok_or(ErrorKind::ReleasedDittoInstance)?;
let mut attachment_fetchers_lockguard = self.attachment_fetchers.write().unwrap();
let fetcher = DittoAttachmentFetcher::new(
attachment_token,
Some(&ditto),
&self.ditto,
move |event, cancel_token: &AtomicU64| {
let has_finished = matches! {
event,
| DittoAttachmentFetchEvent::Completed { .. }
| DittoAttachmentFetchEvent::Deleted
};
on_fetch_event(event);
if has_finished {
if let Some(ditto) = weak_ditto.upgrade() {
let mut attachment_fetchers_inner_lockguard =
ditto.store.attachment_fetchers.write().unwrap();
let cancel_token = cancel_token.load(atomic::Ordering::Relaxed);
ditto.store.unregister_fetcher(
cancel_token,
Some(&mut *attachment_fetchers_inner_lockguard),
);
}
}
},
)?;
let (cancel_token, was_zero) = fetcher.cancel_token_ensure_unique();
attachment_fetchers_lockguard.insert(cancel_token, (was_zero, fetcher.clone()));
Ok(fetcher)
}
fn unregister_fetcher(
&self,
mut fetcher_cancel_token: CancelToken,
fetchers: Option<
&mut HashMap<CancelToken, (bool, DittoAttachmentFetcher<'static, FetcherVersion::V2>)>,
>,
) -> bool {
let mut lock_guard = None;
let fetchers = fetchers.unwrap_or_else(|| {
&mut **lock_guard.get_or_insert(self.attachment_fetchers.write().unwrap())
});
let Some((was_zero, removed_fetcher)) = fetchers.remove(&fetcher_cancel_token) else {
return false;
};
drop(lock_guard);
if was_zero {
fetcher_cancel_token = 0;
}
let att_token = &removed_fetcher.context.token;
#[allow(deprecated)] {
debug!(
token_id = %att_token.id(),
%fetcher_cancel_token,
"unregistering ditto attachment fetcher"
);
}
let status = ffi_sdk::ditto_cancel_resolve_attachment(
&self.ditto,
att_token.id.as_ref().into(),
fetcher_cancel_token,
);
if status != 0 {
#[allow(deprecated)] {
error!(
token_id = %att_token.id(),
%fetcher_cancel_token,
"failed to clean up attachment fetcher"
);
}
}
status == 0
}
pub fn attachment_fetchers(&self) -> Vec<DittoAttachmentFetcher<'static, FetcherVersion::V2>> {
self.attachment_fetchers
.read()
.unwrap()
.iter()
.map(|(_, (_, fetcher))| fetcher.clone())
.collect()
}
}
#[non_exhaustive]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum SortDirection {
Ascending,
Descending,
}