use std::ops::Not;
use serde::Serialize;
use crate::{
ditto::{TryUpgrade, WeakDittoHandleWrapper},
error::{DittoError, ErrorKind},
ffi_sdk,
store::{
collection::document_id::DocumentId,
live_query::{SingleDocumentEventHandler, SingleDocumentLiveQueryEvent},
update::UpdateResult,
},
subscription::Subscription,
utils::prelude::*,
};
const LMDB_ERROR_NOT_FOUND_CODE: i32 = -30798;
pub struct PendingIdSpecificOperation {
pub(super) ditto: WeakDittoHandleWrapper,
pub(super) collection_name: char_p::Box,
pub(super) doc_id: DocumentId,
}
impl PendingIdSpecificOperation {
fn query(&self) -> String {
format!(
"_id == {}",
self.doc_id
.to_query_compatible(ffi_sdk::StringPrimitiveFormat::WithQuotes)
)
}
pub fn subscribe(&self) -> Subscription {
Subscription::new(
self.ditto.try_upgrade().unwrap(),
self.collection_name.clone(),
self.query().as_str(),
None,
&(vec![])[..],
-1,
0,
)
}
pub fn remove(&self) -> Result<(), DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut txn = unsafe { ffi_sdk::ditto_write_transaction(&ditto, hint).ok()? };
let removed = unsafe {
ffi_sdk::ditto_collection_remove(
&ditto,
self.collection_name.as_ref(),
&mut txn,
self.doc_id.bytes.as_slice().into(),
)
.ok()?
};
if removed {
let status = unsafe { ffi_sdk::ditto_write_transaction_commit(&ditto, txn) };
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(())
} else {
unsafe { ffi_sdk::ditto_write_transaction_rollback(&ditto, txn) };
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn evict(&self) -> Result<(), DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut txn = unsafe { ffi_sdk::ditto_write_transaction(&ditto, hint).ok()? };
let evicted = unsafe {
ffi_sdk::ditto_collection_evict(
&ditto,
self.collection_name.as_ref(),
&mut txn,
self.doc_id.bytes.as_slice().into(),
)
.ok()?
};
if evicted {
let status = unsafe { ffi_sdk::ditto_write_transaction_commit(&ditto, txn) };
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(())
} else {
unsafe { ffi_sdk::ditto_write_transaction_rollback(&ditto, txn) };
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn exec(&self) -> Result<BoxedDocument, DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let mut txn = unsafe { ffi_sdk::ditto_read_transaction(&ditto).ok()? };
let result = unsafe {
ffi_sdk::ditto_collection_get(
&ditto,
self.collection_name.as_ref(),
self.doc_id.bytes.as_slice().into(),
&mut txn,
)
};
if result.status_code == LMDB_ERROR_NOT_FOUND_CODE {
Err(DittoError::from_ffi(ErrorKind::NonExtant))
} else {
result.ok()
}
}
pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
where
Handler: SingleDocumentEventHandler,
{
self.observe_internal(None, handler)
}
fn build_live_query<Handler>(
&self,
sub: Option<Box<Subscription>>,
handler: Handler,
) -> Result<LiveQuery, DittoError>
where
Handler: EventHandler,
{
LiveQuery::with_handler(
self.ditto.clone(),
char_p::new(self.query().as_str()),
None,
self.collection_name.clone(),
&[],
-1,
0,
sub,
handler,
)
}
fn observe_internal<Handler>(
&self,
sub: Option<Box<Subscription>>,
handler: Handler,
) -> Result<LiveQuery, DittoError>
where
Handler: SingleDocumentEventHandler,
{
let mut handler = handler;
let mapped_handler = move |mut docs: Vec<BoxedDocument>, event: LiveQueryEvent| match event
{
LiveQueryEvent::Initial => {
if docs.len() > 1 {
::log::error!("Single document live query returned more than 1 document");
debug_assert!(docs.len() <= 1);
}
let event = SingleDocumentLiveQueryEvent {
is_initial: true,
old_document: None,
};
handler(docs.pop(), event);
}
LiveQueryEvent::Update {
mut old_documents,
insertions,
deletions,
updates,
moves,
} => {
if moves.is_empty().not() {
::log::error!(
"Single document live query received an update event saying that there \
was a move, which should not happen"
);
debug_assert!(moves.is_empty());
}
if insertions.len() > 1 || deletions.len() > 1 || updates.len() > 1 {
::log::error!(
"Single document live query received an update event with too many \
insertions, deletions, or updates"
);
debug_assert!(insertions.len() <= 1);
debug_assert!(deletions.len() <= 1);
debug_assert!(updates.len() <= 1);
}
let event = SingleDocumentLiveQueryEvent {
is_initial: false,
old_document: old_documents.pop(),
};
handler(docs.pop(), event);
}
};
self.build_live_query(sub, mapped_handler)
}
pub fn update<Updater>(&self, updater: Updater) -> Result<Vec<UpdateResult>, DittoError>
where
Updater: Fn(Option<&mut BoxedDocument>),
{
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let mut document = {
let mut read_txn = unsafe { ffi_sdk::ditto_read_transaction(&ditto).ok()? };
let res = unsafe {
ffi_sdk::ditto_collection_get(
&ditto,
self.collection_name.as_ref(),
self.doc_id.bytes.as_slice().into(),
&mut read_txn,
)
};
if res.status_code != 0 {
return Err(DittoError::from_ffi(ErrorKind::NonExtant));
}
res.ok_value };
updater(document.as_mut());
let diff = Vec::with_capacity(0);
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&ditto, hint).ok()? };
if let Some(doc) = document {
match unsafe {
ffi_sdk::ditto_collection_update(
&ditto,
self.collection_name.as_ref(),
&mut write_txn,
doc,
)
} {
0 => {
let status =
unsafe { ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn) };
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(diff)
}
i32::MIN..=-1 | 1..=i32::MAX => {
unsafe { ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn) };
Err(DittoError::from_ffi(ErrorKind::InvalidInput))
}
}
} else {
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn update_doc<T>(&self, new_value: &T) -> Result<(), DittoError>
where
T: Serialize,
{
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let mut document = {
let mut read_txn = unsafe { ffi_sdk::ditto_read_transaction(&ditto).ok()? };
unsafe {
ffi_sdk::ditto_collection_get(
&ditto,
self.collection_name.as_ref(),
self.doc_id.bytes.as_slice().into(),
&mut read_txn,
)
.ok_or(ErrorKind::NonExtant)?
}
}; let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&ditto, hint).ok()? };
let new_content = ::serde_cbor::to_vec(new_value).unwrap();
if unsafe {
ffi_sdk::ditto_document_update(
&mut document,
new_content.as_slice().into(),
false, )
} != 0
{
return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
}
let code = unsafe {
ffi_sdk::ditto_collection_update(
&ditto,
self.collection_name.as_ref(),
&mut write_txn,
document,
)
};
if code != 0 {
unsafe { ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn) };
Err(DittoError::from_ffi(ErrorKind::InvalidInput))
} else {
let status = unsafe { ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn) };
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use crate::{prelude::*, test_helpers::setup_ditto, utils::prelude::ErrorKind};
#[test]
fn test_doc_not_found() {
let ditto = setup_ditto().unwrap();
let store = ditto.store();
let collection = store.collection("test").unwrap();
let doc_id = DocumentId::new(&"test_key").unwrap();
let doc = collection.find_by_id(doc_id).exec();
assert!(doc.is_err());
let e = doc.err().unwrap();
let error_kind = e.kind();
assert_eq!(error_kind, ErrorKind::NonExtant);
}
#[test]
fn test_doc_found() {
let ditto = setup_ditto().unwrap();
let store = ditto.store();
let collection = store.collection("test").unwrap();
let doc_id = DocumentId::new(&"test_key").unwrap();
let doc = collection.find_by_id(doc_id.clone()).exec();
assert!(doc.is_err());
let e = doc.err().unwrap();
let error_kind = e.kind();
assert_eq!(error_kind, ErrorKind::NonExtant);
let content = json!({"hello": "again", "_id": doc_id});
let inserted = collection.upsert(content);
assert!(inserted.is_ok());
let doc = collection.find_by_id(doc_id).exec();
assert!(doc.is_ok());
}
}