use std::{ops::Not, sync::Weak};
use serde::Serialize;
use crate::{
ditto::TryUpgrade,
error::{DittoError, ErrorKind},
ffi_sdk,
store::{
collection::document_id::DocumentId,
live_query::{SingleDocumentEventHandler, SingleDocumentLiveQueryEvent},
update::UpdateResult,
},
subscription::Subscription,
utils::prelude::*,
};
const LMDB_ERROR_NOT_FOUND_CODE: i32 = -30798;
pub struct PendingIdSpecificOperation {
pub(super) ditto: Weak<BoxedDitto>,
pub(super) collection_name: char_p::Box,
pub(super) doc_id: DocumentId,
}
impl PendingIdSpecificOperation {
fn query(&self) -> String {
format!(
"_id == {}",
self.doc_id
.to_query_compatible(ffi_sdk::StringPrimitiveFormat::WithQuotes)
)
}
pub fn exec(&self) -> Result<BoxedDocument, DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let mut txn = ffi_sdk::ditto_read_transaction(&ditto).ok()?;
let result = {
ffi_sdk::ditto_collection_get(
&ditto,
self.collection_name.as_ref(),
self.doc_id.bytes.as_slice().into(),
&mut txn,
)
};
if result.status_code == LMDB_ERROR_NOT_FOUND_CODE {
Err(DittoError::from_ffi(ErrorKind::NonExtant))
} else {
result.ok()
}
}
pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
where
Handler: SingleDocumentEventHandler,
{
let mut handler = handler;
let mapped_handler = move |mut docs: Vec<BoxedDocument>, event: LiveQueryEvent| match event
{
LiveQueryEvent::Initial => {
if docs.len() > 1 {
error!("single document live query returned more than 1 document");
debug_assert!(docs.len() <= 1);
}
let event = SingleDocumentLiveQueryEvent {
is_initial: true,
old_document: None,
};
handler(docs.pop(), event);
}
LiveQueryEvent::Update {
mut old_documents,
insertions,
deletions,
updates,
moves,
} => {
if moves.is_empty().not() {
error!(
"single document live query received an update event saying that there \
was a move, which should not happen"
);
debug_assert!(moves.is_empty());
}
if insertions.len() > 1 || deletions.len() > 1 || updates.len() > 1 {
error!(
"single document live query received an update event with too many \
insertions, deletions, or updates"
);
debug_assert!(insertions.len() <= 1);
debug_assert!(deletions.len() <= 1);
debug_assert!(updates.len() <= 1);
}
let event = SingleDocumentLiveQueryEvent {
is_initial: false,
old_document: old_documents.pop(),
};
handler(docs.pop(), event);
}
};
#[allow(deprecated)]
LiveQuery::with_handler(
self.ditto.clone(),
char_p::new(self.query().as_str()),
None,
self.collection_name.clone(),
&[],
-1,
0,
mapped_handler,
)
}
pub fn subscribe(&self) -> Subscription {
Subscription::new(
self.ditto.try_upgrade().unwrap(),
self.collection_name.clone(),
self.query().as_str(),
None,
&(vec![])[..],
-1,
0,
)
}
pub fn remove(&self) -> Result<(), DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut txn = ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?;
let removed = {
ffi_sdk::ditto_collection_remove(
&ditto,
self.collection_name.as_ref(),
&mut txn,
self.doc_id.bytes.as_slice().into(),
)
.ok()?
};
if removed {
let status = ffi_sdk::ditto_write_transaction_commit(&ditto, txn);
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(())
} else {
ffi_sdk::ditto_write_transaction_rollback(&ditto, txn);
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn evict(&self) -> Result<(), DittoError> {
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut txn = ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?;
let evicted = {
ffi_sdk::ditto_collection_evict(
&ditto,
self.collection_name.as_ref(),
&mut txn,
self.doc_id.bytes.as_slice().into(),
)
.ok()?
};
if evicted {
let status = ffi_sdk::ditto_write_transaction_commit(&ditto, txn);
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(())
} else {
ffi_sdk::ditto_write_transaction_rollback(&ditto, txn);
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn update<Updater>(&self, updater: Updater) -> Result<Vec<UpdateResult>, DittoError>
where
Updater: Fn(Option<&mut BoxedDocument>),
{
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = Some(ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?);
let mut document = Some(
ffi_sdk::ditto_collection_get_with_write_transaction(
&ditto,
self.collection_name.as_ref(),
self.doc_id.bytes.as_slice().into(),
write_txn.as_mut().expect("write txn Some"),
)
.ok_or_else(|| {
ffi_sdk::ditto_write_transaction_rollback(
&ditto,
write_txn.take().expect("write txn Some"),
);
ErrorKind::NonExtant
})?,
);
updater(document.as_mut());
let diff = Vec::with_capacity(0);
if let Some(doc) = document {
match ffi_sdk::ditto_collection_update(
&ditto,
self.collection_name.as_ref(),
write_txn.as_mut().expect("write txn Some"),
doc,
) {
0 => {
let status = ffi_sdk::ditto_write_transaction_commit(
&ditto,
write_txn.take().expect("write txn Some"),
);
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(diff)
}
i32::MIN..=-1 | 1..=i32::MAX => {
ffi_sdk::ditto_write_transaction_rollback(
&ditto,
write_txn.take().expect("write txn Some"),
);
Err(DittoError::from_ffi(ErrorKind::InvalidInput))
}
}
} else {
ffi_sdk::ditto_write_transaction_rollback(
&ditto,
write_txn.take().expect("write txn Some"),
);
Err(DittoError::from_ffi(ErrorKind::NonExtant))
}
}
pub fn update_doc<T>(&self, new_value: &T) -> Result<(), DittoError>
where
T: Serialize,
{
let ditto = self
.ditto
.upgrade()
.ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
let hint: Option<char_p::Ref<'_>> = None;
let mut write_txn = Some(ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?);
let mut document = {
ffi_sdk::ditto_collection_get_with_write_transaction(
&ditto,
self.collection_name.as_ref(),
self.doc_id.bytes.as_slice().into(),
write_txn.as_mut().expect("write txn Some"),
)
.ok_or_else(|| {
ffi_sdk::ditto_write_transaction_rollback(
&ditto,
write_txn.take().expect("write txn Some"),
);
ErrorKind::NonExtant
})?
};
let new_content = ::serde_cbor::to_vec(new_value).unwrap();
if ffi_sdk::ditto_document_update(&mut document, new_content.as_slice().into()) != 0 {
ffi_sdk::ditto_write_transaction_rollback(
&ditto,
write_txn.take().expect("write txn Some"),
);
return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
}
let code = {
ffi_sdk::ditto_collection_update(
&ditto,
self.collection_name.as_ref(),
write_txn.as_mut().expect("write txn Some"),
document,
)
};
if code != 0 {
ffi_sdk::ditto_write_transaction_rollback(
&ditto,
write_txn.take().expect("write txn Some"),
);
Err(DittoError::from_ffi(ErrorKind::InvalidInput))
} else {
let status = {
ffi_sdk::ditto_write_transaction_commit(
&ditto,
write_txn.take().expect("write txn Some"),
)
};
if status != 0 {
return Err(DittoError::from_ffi(ErrorKind::Internal));
}
Ok(())
}
}
}
#[cfg(all(test, any()))]
mod tests {
use serde_json::json;
use crate::{prelude::*, test_helpers::setup_ditto, utils::prelude::ErrorKind};
#[test]
fn test_doc_not_found() {
let ditto = setup_ditto().unwrap();
let store = ditto.store();
let collection = store.collection("test").unwrap();
let doc_id = DocumentId::new(&"test_key").unwrap();
let doc = collection.find_by_id(doc_id).exec();
assert!(doc.is_err());
let e = doc.err().unwrap();
let error_kind = e.kind();
assert_eq!(error_kind, ErrorKind::NonExtant);
}
#[test]
fn test_doc_found() {
let ditto = setup_ditto().unwrap();
let store = ditto.store();
let collection = store.collection("test").unwrap();
let doc_id = DocumentId::new(&"test_key").unwrap();
let doc = collection.find_by_id(doc_id.clone()).exec();
assert!(doc.is_err());
let e = doc.err().unwrap();
let error_kind = e.kind();
assert_eq!(error_kind, ErrorKind::NonExtant);
let content = json!({"hello": "again", "_id": doc_id});
let inserted = collection.upsert(content);
assert!(inserted.is_ok());
let doc = collection.find_by_id(doc_id).exec();
assert!(doc.is_ok());
}
}