use_prelude!();
use std::collections::BTreeMap;
use ffi_sdk::COrderByParam;
use crate::{
ditto::{TryUpgrade, WeakDittoHandleWrapper},
error::{DittoError, ErrorKind},
store::{collection::document_id::DocumentId, live_query::LiveQuery, update::UpdateResult},
subscription::Subscription,
};
pub struct PendingCursorOperation<'order_by> {
pub(super) ditto: WeakDittoHandleWrapper,
pub(super) collection_name: char_p::Box,
pub(super) query: char_p::Box,
pub(super) query_args: Option<Vec<u8>>,
pub(super) offset: u32,
pub(super) limit: i32,
pub(super) order_by: Vec<COrderByParam<'order_by>>,
}
impl<'order_by> PendingCursorOperation<'order_by> {
pub fn new(
ditto: WeakDittoHandleWrapper,
collection_name: char_p::Box,
query: &str,
query_args: Option<Vec<u8>>,
) -> Self {
let query = char_p::new(query);
Self {
ditto,
collection_name,
query,
query_args,
offset: 0,
limit: -1,
order_by: vec![],
}
}
pub fn exec(&self) -> Result<Vec<BoxedDocument>, DittoError> {
self.exec_internal(None)
}
fn exec_internal(
&self,
txn: Option<&'_ mut ffi_sdk::CWriteTransaction>,
) -> Result<Vec<BoxedDocument>, DittoError> {
let ditto = self.ditto.try_upgrade()?;
{
ffi_sdk::ditto_collection_exec_query_str(
&*ditto,
self.collection_name.as_ref(),
txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
}
.ok_or(ErrorKind::InvalidInput)
.map(|it| it.into())
}
pub fn subscribe(&self) -> Subscription {
let ditto = self.ditto.try_upgrade().unwrap();
Subscription::new(
ditto,
self.collection_name.clone(),
self.query.to_str(),
self.query_args.clone(),
&self.order_by,
self.limit,
self.offset,
)
}
pub fn update<Updater>(
&self,
updater: Updater,
) -> Result<BTreeMap<DocumentId, Vec<UpdateResult>>, DittoError>
where
Updater: Fn(&mut [BoxedDocument]),
{
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;
let mut docs = self.exec_internal(Some(&mut write_txn))?;
updater(&mut docs);
let diff = BTreeMap::<DocumentId, Vec<UpdateResult>>::new();
let code = {
ffi_sdk::ditto_collection_update_multiple(
&ditto,
self.collection_name.as_ref(),
&mut write_txn,
docs.into(),
)
};
if code != 0 {
ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn);
return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
}
ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
Ok(diff)
}
pub fn limit(&mut self, limit: i32) -> &mut Self {
self.limit = limit;
self
}
pub fn offset(&mut self, offset: u32) -> &mut Self {
self.offset = offset;
self
}
pub fn remove(&self) -> Result<Vec<DocumentId>, DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;
let ids = {
ffi_sdk::ditto_collection_remove_query_str(
&*ditto,
self.collection_name.as_ref(),
&mut write_txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
Ok(ids
.to::<Vec<_>>()
.into_iter()
.map(|s| s.to::<Box<[u8]>>().into())
.collect())
}
pub fn evict(&self) -> Result<Vec<DocumentId>, DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;
let ids = {
ffi_sdk::ditto_collection_evict_query_str(
&*ditto,
self.collection_name.as_ref(),
&mut write_txn,
self.query.as_ref(),
self.query_args.as_ref().map(|qa| (&qa[..]).into()),
(&self.order_by[..]).into(),
self.limit,
self.offset,
)
.ok_or(ErrorKind::InvalidInput)?
};
ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
Ok(ids
.to::<Vec<_>>()
.into_iter()
.map(|s| s.to::<Box<[u8]>>().into())
.collect())
}
pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
where
Handler: EventHandler,
{
LiveQuery::with_handler(
self.ditto.clone(),
self.query.clone(),
self.query_args.clone(),
self.collection_name.clone(),
&self.order_by,
self.limit,
self.offset,
handler,
)
}
pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
self.order_by = sort_param;
self
}
}