dittolive-ditto 3.0.0-alpha2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use serde::Serialize;
use std::ops::Not;

use crate::{
    error::{DittoError, ErrorKind},
    ffi_sdk,
    store::{
        collection::document_id::DocumentId,
        live_query::{SingleDocumentEventHandler, SingleDocumentLiveQueryEvent},
        update::UpdateResult,
    },
    subscription::Subscription,
    utils::prelude::*,
};

const LMDB_ERROR_NOT_FOUND_CODE: i32 = -30798;

/// These objects are returned when using find_by_id functionality on
/// DittoCollections. You can either call exec on the object to get an immediate
/// return value, or you can establish either a live query or a subscription,
/// which both work over time. A live query, established by calling
/// observe_local, will notify you every time there’s an update to the document
/// with the ID you provided in the preceding find_by_id call. A subscription,
/// established by calling subscribe, will act as a signal to other peers that
/// you would like to receive updates from them about the document with the ID
/// you provided in the preceding find_by_id call. Update and remove
/// functionality is also exposed through this object.
pub struct PendingIdSpecificOperation {
    pub(super) ditto: Arc<BoxedDitto>,
    pub(super) collection_name: char_p::Box,
    pub(super) doc_id: DocumentId,
}

impl PendingIdSpecificOperation {
    fn query(&self) -> String {
        format!(
            "_id == {}",
            self.doc_id
                .to_query_compatible(ffi_sdk::StringPrimitiveFormat::WithQuotes)
        )
    }

    /// Enables you to subscribe to changes that occur in relation to a
    /// document. Having a subscription acts as a signal to other peers that you
    /// are interested in receiving updates when local or remote changes are
    /// made to the relevant document. The returned DittoSubscription object
    /// must be kept in scope for as long as you want to keep receiving updates.
    pub fn subscribe(&self) -> Subscription {
        Subscription::new(
            self.ditto.retain(),
            self.collection_name.clone(),
            self.query().as_str(),
            None,
            &(vec![])[..],
            -1,
            0,
        )
    }

    /// Remove the document with the matching ID.
    pub fn remove(&self) -> Result<(), DittoError> {
        let mut txn = unsafe { ffi_sdk::ditto_write_transaction(&self.ditto).ok()? };
        let removed = unsafe {
            ffi_sdk::ditto_collection_remove(
                &self.ditto,
                self.collection_name.as_ref(),
                &mut txn,
                self.doc_id.bytes.as_slice().into(),
            )
            .ok()?
        };
        if removed {
            let status = unsafe { ffi_sdk::ditto_write_transaction_commit(&self.ditto, txn) };
            if status != 0 {
                return Err(DittoError::from_ffi(ErrorKind::Internal));
            }
            Ok(())
        } else {
            unsafe { ffi_sdk::ditto_write_transaction_rollback(&self.ditto, txn) };
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        }
    }

    /// Evict the document with the matching ID.
    pub fn evict(&self) -> Result<(), DittoError> {
        let mut txn = unsafe { ffi_sdk::ditto_write_transaction(&self.ditto).ok()? };
        let evicted = unsafe {
            ffi_sdk::ditto_collection_evict(
                &self.ditto,
                self.collection_name.as_ref(),
                &mut txn,
                self.doc_id.bytes.as_slice().into(),
            )
            .ok()?
        };
        if evicted {
            let status = unsafe { ffi_sdk::ditto_write_transaction_commit(&self.ditto, txn) };
            if status != 0 {
                return Err(DittoError::from_ffi(ErrorKind::Internal));
            }
            Ok(())
        } else {
            unsafe { ffi_sdk::ditto_write_transaction_rollback(&self.ditto, txn) };
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        }
    }

    /// Execute the find operation to return the document with the matching ID.
    pub fn exec(&self) -> Result<BoxedDocument, DittoError> {
        let mut txn = unsafe { ffi_sdk::ditto_read_transaction(&self.ditto).ok()? };
        let result = unsafe {
            ffi_sdk::ditto_collection_get(
                &self.ditto,
                self.collection_name.as_ref(),
                self.doc_id.bytes.as_slice().into(),
                &mut txn,
            )
        };
        if result.status_code == LMDB_ERROR_NOT_FOUND_CODE {
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        } else {
            result.ok()
        }
    }

    /// Enables you to listen for changes that occur in relation to a document.
    /// This won’t subscribe to receive changes made remotely by others and so
    /// it will only fire updates when a local change is made. If you want to
    /// receive remotely performed updates as well then use `observe` or also
    /// call `subscribe` separately after another `find_by_id` call that
    /// references the same document ID.
    ///
    /// The returned `LiveQuery` object must be kept in scope for as long as you
    /// want the provided `handler` to be called when an update occurs.
    pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
    where
        Handler: SingleDocumentEventHandler,
    {
        self.observe_internal(None, handler)
    }

    fn build_live_query<Handler>(
        &self,
        sub: Option<Box<Subscription>>,
        handler: Handler,
    ) -> Result<LiveQuery, DittoError>
    where
        Handler: EventHandler,
    {
        LiveQuery::with_handler(
            self.ditto.retain(),
            char_p::new(self.query().as_str()),
            None,
            self.collection_name.clone(),
            &[],
            -1,
            0,
            sub,
            handler,
        )
    }

    fn observe_internal<Handler>(
        &self,
        sub: Option<Box<Subscription>>,
        handler: Handler,
    ) -> Result<LiveQuery, DittoError>
    where
        Handler: SingleDocumentEventHandler,
    {
        let mut handler = handler;
        let mapped_handler = move |mut docs: Vec<BoxedDocument>, event: LiveQueryEvent| match event
        {
            LiveQueryEvent::Initial => {
                if docs.len() > 1 {
                    ::log::error!("Single document live query returned more than 1 document");
                    debug_assert!(docs.len() <= 1);
                }
                let event = SingleDocumentLiveQueryEvent {
                    is_initial: true,
                    old_document: None,
                };
                handler(docs.pop(), event);
            }
            LiveQueryEvent::Update {
                mut old_documents,
                insertions,
                deletions,
                updates,
                moves,
            } => {
                if moves.is_empty().not() {
                    ::log::error!("Single document live query received an update event saying that there was a move, which should not happen");
                    debug_assert!(moves.is_empty());
                }
                if insertions.len() > 1 || deletions.len() > 1 || updates.len() > 1 {
                    ::log::error!("Single document live query received an update event with too many insertions, deletions, or updates");
                    debug_assert!(insertions.len() <= 1);
                    debug_assert!(deletions.len() <= 1);
                    debug_assert!(updates.len() <= 1);
                }
                let event = SingleDocumentLiveQueryEvent {
                    is_initial: false,
                    old_document: old_documents.pop(),
                };
                handler(docs.pop(), event);
            }
        };
        self.build_live_query(sub, mapped_handler)
    }

    /// Update the document with the matching ID.
    /// * `updater` - a Fn which will be called on the selected document if
    ///   found
    pub fn update<Updater>(&self, updater: Updater) -> Result<Vec<UpdateResult>, DittoError>
    where
        Updater: Fn(Option<&mut BoxedDocument>), /* Arg is a Mutable Document, which only exists
                                                  * in SDKs */
    {
        let mut document = {
            let mut read_txn = unsafe { ffi_sdk::ditto_read_transaction(&self.ditto).ok()? };
            let res = unsafe {
                ffi_sdk::ditto_collection_get(
                    &self.ditto,
                    self.collection_name.as_ref(),
                    self.doc_id.bytes.as_slice().into(),
                    &mut read_txn,
                )
            };
            if res.status_code != 0 {
                return Err(DittoError::from_ffi(ErrorKind::NonExtant));
            }
            res.ok_value // don't unwrap option
        };

        // Apply the closure to the document
        updater(document.as_mut());
        let diff = Vec::with_capacity(0); // TODO Mutable doc will populate this

        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&self.ditto).ok()? };
        if let Some(doc) = document {
            match unsafe {
                ffi_sdk::ditto_collection_update(
                    &self.ditto,
                    self.collection_name.as_ref(),
                    &mut write_txn,
                    doc,
                )
            } {
                0 => {
                    let status =
                        unsafe { ffi_sdk::ditto_write_transaction_commit(&self.ditto, write_txn) };
                    if status != 0 {
                        return Err(DittoError::from_ffi(ErrorKind::Internal));
                    }
                    Ok(diff)
                }
                i32::MIN..=-1 | 1..=i32::MAX => {
                    unsafe { ffi_sdk::ditto_write_transaction_rollback(&self.ditto, write_txn) };
                    Err(DittoError::from_ffi(ErrorKind::InvalidInput))
                }
            }
        } else {
            Err(DittoError::from_ffi(ErrorKind::NonExtant))
        }
    }

    /// Replaces the matching document with the provided value
    /// * `new_value` - A new Serializable which will replace the found document
    /// Note this actually follows "upsert" rules and will insert a document if
    /// no document is found with the given DocumentId.
    pub fn update_doc<T>(&self, new_value: &T) -> Result<(), DittoError>
    where
        T: Serialize,
    {
        // We use the doc_id to find the document (verify it exists)
        // and only if it is found, we then replace it's contents with new_value
        // then we insert this mutated document.

        // Create a ReadTransaction and try to find the document
        let mut document = {
            let mut read_txn = unsafe { ffi_sdk::ditto_read_transaction(&self.ditto).ok()? };
            unsafe {
                ffi_sdk::ditto_collection_get(
                    &self.ditto,
                    self.collection_name.as_ref(),
                    self.doc_id.bytes.as_slice().into(),
                    &mut read_txn,
                )
                .ok_or(ErrorKind::NonExtant)?
            }
        }; // ReadTransaction should be dropped
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&self.ditto).ok()? };
        let new_content = ::serde_cbor::to_vec(new_value).unwrap();
        // REPLACE the entire document with provided value
        if unsafe {
            ffi_sdk::ditto_document_update(
                &mut document,
                new_content.as_slice().into(),
                false, // don't insert new paths
            )
        } != 0
        {
            return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
        }

        let code = unsafe {
            ffi_sdk::ditto_collection_update(
                &self.ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                document,
            )
        };
        if code != 0 {
            unsafe { ffi_sdk::ditto_write_transaction_rollback(&self.ditto, write_txn) };
            Err(DittoError::from_ffi(ErrorKind::InvalidInput))
        } else {
            let status = unsafe { ffi_sdk::ditto_write_transaction_commit(&self.ditto, write_txn) };
            if status != 0 {
                return Err(DittoError::from_ffi(ErrorKind::Internal));
            }
            Ok(())
        }
    }
}

#[cfg(test)]
mod tests {
    use serde_json::json;

    use crate::{prelude::*, test_helpers::setup_ditto, utils::prelude::ErrorKind};

    #[test]
    fn test_doc_not_found() {
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();
        let doc_id = DocumentId::new(&"test_key").unwrap();
        let doc = collection.find_by_id(doc_id).exec();

        assert!(doc.is_err());

        let e = doc.err().unwrap();
        let error_kind = e.kind();
        assert_eq!(error_kind, ErrorKind::NonExtant);
    }

    #[test]
    fn test_doc_found() {
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();
        let doc_id = DocumentId::new(&"test_key").unwrap();
        let doc = collection.find_by_id(doc_id.clone()).exec();

        assert!(doc.is_err());

        let e = doc.err().unwrap();
        let error_kind = e.kind();
        assert_eq!(error_kind, ErrorKind::NonExtant);

        let content = json!({"hello": "again", "_id": doc_id});
        let inserted = collection.upsert(content);

        assert!(inserted.is_ok());

        let doc = collection.find_by_id(doc_id).exec();

        assert!(doc.is_ok());
    }
}