dittolive-ditto 4.5.3

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use std::ops::Not;

use serde::Serialize;

use crate::{
    ditto::{TryUpgrade, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
    ffi_sdk,
    store::{
        collection::document_id::DocumentId,
        live_query::{SingleDocumentEventHandler, SingleDocumentLiveQueryEvent},
        update::UpdateResult,
    },
    subscription::Subscription,
    utils::prelude::*,
};

const LMDB_ERROR_NOT_FOUND_CODE: i32 = -30798;

/// These objects are returned when using
/// [`Collection::find_by_id`](crate::prelude::Collection::find_by_id) functionality.
///
/// You can either call [`exec`](PendingIdSpecificOperation::exec) on the object to get an immediate
/// return value, or you can establish either a live query or a subscription, which both work over
/// time. A live query, established by calling
/// [`observe_local`](PendingIdSpecificOperation::observe_local), will notify you every time there’s
/// an update to the document with the Id you provided in the preceding
/// [`Collection::find_by_id`](crate::prelude::Collection::find_by_id) call. A subscription,
/// established by calling [`subscribe`](PendingIdSpecificOperation::subscribe), will act as a
/// signal to other peers that you would like to receive updates from them about the document with
/// the Id you provided in the preceding
/// [`Collection::find_by_id`](crate::prelude::Collection::find_by_id) call. Update and remove
/// functionality is also exposed through this object.
pub struct PendingIdSpecificOperation {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) collection_name: char_p::Box,
    pub(super) doc_id: DocumentId,
}

impl PendingIdSpecificOperation {
    fn query(&self) -> String {
        format!(
            "_id == {}",
            self.doc_id
                .to_query_compatible(ffi_sdk::StringPrimitiveFormat::WithQuotes)
        )
    }

    /// Enables you to subscribe to changes that occur in relation to a document. Having a
    /// subscription acts as a signal to other peers that you are interested in receiving updates
    /// when local or remote changes are made to the relevant document. The returned
    /// [`Subscription`] object must be kept in scope for as long as you want to keep receiving
    /// updates.
    ///
    /// # Panics
    /// Panics if Ditto has been closed.
    pub fn subscribe(&self) -> Subscription {
        Subscription::new(
            self.ditto.try_upgrade().unwrap(),
            self.collection_name.clone(),
            self.query().as_str(),
            None,
            &(vec![])[..],
            -1,
            0,
        )
    }

    /// Remove the document with the matching Id.
    pub fn remove(&self) -> Result<(), DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut txn = ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?;
        let removed = {
            ffi_sdk::ditto_collection_remove(
                &ditto,
                self.collection_name.as_ref(),
                &mut txn,
                self.doc_id.bytes.as_slice().into(),
            )
            .ok()?
        };
        if removed {
            let status = ffi_sdk::ditto_write_transaction_commit(&ditto, txn);
            if status != 0 {
                return Err(DittoError::from_ffi(ErrorKind::Internal));
            }
            Ok(())
        } else {
            ffi_sdk::ditto_write_transaction_rollback(&ditto, txn);
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        }
    }

    /// Evict the document with the matching Id.
    pub fn evict(&self) -> Result<(), DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut txn = ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?;
        let evicted = {
            ffi_sdk::ditto_collection_evict(
                &ditto,
                self.collection_name.as_ref(),
                &mut txn,
                self.doc_id.bytes.as_slice().into(),
            )
            .ok()?
        };
        if evicted {
            let status = ffi_sdk::ditto_write_transaction_commit(&ditto, txn);
            if status != 0 {
                return Err(DittoError::from_ffi(ErrorKind::Internal));
            }
            Ok(())
        } else {
            ffi_sdk::ditto_write_transaction_rollback(&ditto, txn);
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        }
    }

    /// Execute the find operation to return the document with the matching Id.
    pub fn exec(&self) -> Result<BoxedDocument, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let mut txn = ffi_sdk::ditto_read_transaction(&ditto).ok()?;
        let result = {
            ffi_sdk::ditto_collection_get(
                &ditto,
                self.collection_name.as_ref(),
                self.doc_id.bytes.as_slice().into(),
                &mut txn,
            )
        };
        if result.status_code == LMDB_ERROR_NOT_FOUND_CODE {
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        } else {
            result.ok()
        }
    }

    /// Enables you to listen for changes that occur in relation to a document. This won’t subscribe
    /// to receive changes made remotely by others and so it will only fire updates when a local
    /// change is made. If you want to receive remotely performed updates as well then call
    /// [`subscribe`](PendingIdSpecificOperation::subscribe) separately after another
    /// [`find_by_id`](Collection::find_by_id) call that references the same
    /// document Id.
    ///
    /// The returned [`LiveQuery`] object must be kept in scope for as long as you want the
    /// provided `handler` to be called when an update occurs.
    pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
    where
        Handler: SingleDocumentEventHandler,
    {
        let mut handler = handler;
        let mapped_handler = move |mut docs: Vec<BoxedDocument>, event: LiveQueryEvent| match event
        {
            LiveQueryEvent::Initial => {
                if docs.len() > 1 {
                    ::log::error!("Single document live query returned more than 1 document");
                    debug_assert!(docs.len() <= 1);
                }
                let event = SingleDocumentLiveQueryEvent {
                    is_initial: true,
                    old_document: None,
                };
                handler(docs.pop(), event);
            }
            LiveQueryEvent::Update {
                mut old_documents,
                insertions,
                deletions,
                updates,
                moves,
            } => {
                if moves.is_empty().not() {
                    ::log::error!(
                        "Single document live query received an update event saying that there \
                         was a move, which should not happen"
                    );
                    debug_assert!(moves.is_empty());
                }
                if insertions.len() > 1 || deletions.len() > 1 || updates.len() > 1 {
                    ::log::error!(
                        "Single document live query received an update event with too many \
                         insertions, deletions, or updates"
                    );
                    debug_assert!(insertions.len() <= 1);
                    debug_assert!(deletions.len() <= 1);
                    debug_assert!(updates.len() <= 1);
                }
                let event = SingleDocumentLiveQueryEvent {
                    is_initial: false,
                    old_document: old_documents.pop(),
                };
                handler(docs.pop(), event);
            }
        };

        LiveQuery::with_handler(
            self.ditto.clone(),
            char_p::new(self.query().as_str()),
            None,
            self.collection_name.clone(),
            &[],
            -1,
            0,
            mapped_handler,
        )
    }

    /// Update the document with the matching Id.
    ///
    /// - `updater`: an `Fn` which will be called on the selected document, if found
    pub fn update<Updater>(&self, updater: Updater) -> Result<Vec<UpdateResult>, DittoError>
    where
        Updater: Fn(Option<&mut BoxedDocument>), /* Arg is a Mutable Document, which only exists
                                                  * in SDKs */
    {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let mut document = Some({
            let mut read_txn = ffi_sdk::ditto_read_transaction(&ditto).ok()?;
            {
                ffi_sdk::ditto_collection_get(
                    &ditto,
                    self.collection_name.as_ref(),
                    self.doc_id.bytes.as_slice().into(),
                    &mut read_txn,
                )
            }
            .ok_or(ErrorKind::NonExtant)?
        });

        // Apply the closure to the document
        updater(document.as_mut());
        let diff = Vec::with_capacity(0); // TODO Mutable doc will populate this

        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?;
        if let Some(doc) = document {
            match ffi_sdk::ditto_collection_update(
                &ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                doc,
            ) {
                0 => {
                    let status = ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
                    if status != 0 {
                        return Err(DittoError::from_ffi(ErrorKind::Internal));
                    }
                    Ok(diff)
                }
                i32::MIN..=-1 | 1..=i32::MAX => {
                    ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn);
                    Err(DittoError::from_ffi(ErrorKind::InvalidInput))
                }
            }
        } else {
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        }
    }

    /// Replaces the matching document with the provided value.
    ///
    /// - `new_value`: A new `Serializable` which will replace the found document
    ///
    /// Note this actually follows "upsert" rules and will insert a document if no document is found
    /// with the given [`DocumentId`].
    pub fn update_doc<T>(&self, new_value: &T) -> Result<(), DittoError>
    where
        T: Serialize,
    {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        // We use the doc_id to find the document (verify it exists)
        // and only if it is found, we then replace it's contents with new_value
        // then we insert this mutated document.

        // Create a ReadTransaction and try to find the document
        let mut document = {
            let mut read_txn = ffi_sdk::ditto_read_transaction(&ditto).ok()?;
            {
                ffi_sdk::ditto_collection_get(
                    &ditto,
                    self.collection_name.as_ref(),
                    self.doc_id.bytes.as_slice().into(),
                    &mut read_txn,
                )
                .ok_or(ErrorKind::NonExtant)?
            }
        }; // ReadTransaction should be dropped
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = ffi_sdk::ditto_write_transaction(&ditto, hint).ok()?;
        let new_content = ::serde_cbor::to_vec(new_value).unwrap();
        // REPLACE the entire document with provided value
        if ffi_sdk::ditto_document_update(&mut document, new_content.as_slice().into()) != 0 {
            return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
        }

        let code = {
            ffi_sdk::ditto_collection_update(
                &ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                document,
            )
        };
        if code != 0 {
            ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn);
            Err(DittoError::from_ffi(ErrorKind::InvalidInput))
        } else {
            let status = { ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn) };
            if status != 0 {
                return Err(DittoError::from_ffi(ErrorKind::Internal));
            }
            Ok(())
        }
    }
}

#[cfg(test)]
mod tests {
    use serde_json::json;

    use crate::{prelude::*, test_helpers::setup_ditto, utils::prelude::ErrorKind};

    #[test]
    fn test_doc_not_found() {
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();
        let doc_id = DocumentId::new(&"test_key").unwrap();
        let doc = collection.find_by_id(doc_id).exec();

        assert!(doc.is_err());

        let e = doc.err().unwrap();
        let error_kind = e.kind();
        assert_eq!(error_kind, ErrorKind::NonExtant);
    }

    #[test]
    fn test_doc_found() {
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();
        let doc_id = DocumentId::new(&"test_key").unwrap();
        let doc = collection.find_by_id(doc_id.clone()).exec();

        assert!(doc.is_err());

        let e = doc.err().unwrap();
        let error_kind = e.kind();
        assert_eq!(error_kind, ErrorKind::NonExtant);

        let content = json!({"hello": "again", "_id": doc_id});
        let inserted = collection.upsert(content);

        assert!(inserted.is_ok());

        let doc = collection.find_by_id(doc_id).exec();

        assert!(doc.is_ok());
    }
}