dittolive-ditto 3.0.9

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use std::collections::BTreeMap;

use ffi_sdk::COrderByParam;

use crate::{
    ditto::{TryUpgrade, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
    store::{collection::document_id::DocumentId, live_query::LiveQuery, update::UpdateResult},
    subscription::Subscription,
};

pub struct PendingCursorOperation<'order_by> {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) collection_name: char_p::Box,
    pub(super) query: char_p::Box,
    pub(super) query_args: Option<Vec<u8>>,
    pub(super) offset: u32,
    pub(super) limit: i32,
    pub(super) order_by: Vec<COrderByParam<'order_by>>,
}

impl<'order_by> PendingCursorOperation<'order_by> {
    pub fn new(
        ditto: WeakDittoHandleWrapper,
        collection_name: char_p::Box,
        query: &str,
        query_args: Option<Vec<u8>>,
    ) -> Self {
        let query = char_p::new(query);
        Self {
            ditto,
            collection_name,
            query,
            query_args,
            offset: 0,
            limit: -1,
            order_by: vec![],
        }
    }

    /// Execute the query generated by the preceding function chaining and
    /// return the list of matching documents. This occurs immediately.
    pub fn exec(&self) -> Result<Vec<BoxedDocument>, DittoError> {
        self.exec_internal(None)
    }

    fn exec_internal(
        &self,
        txn: Option<&'_ mut ffi_sdk::CWriteTransaction>,
    ) -> Result<Vec<BoxedDocument>, DittoError> {
        let ditto = self.ditto.try_upgrade()?;
        unsafe {
            ffi_sdk::ditto_collection_exec_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
        }
        .ok_or(ErrorKind::InvalidInput)
        .map(|it| it.into())
    }

    /// Enables you to subscribe to changes that occur on a collection. Having a
    /// subscription acts as a signal to others that you are interested in
    /// receiving updates when local or remote changes are made to documents
    /// that match the query generated by the chain of operations
    /// that precedes the call to subscribe. The returned DittoSubscription
    /// object must be kept in scope for as long as you want to keep
    /// receiving updates.
    /// # Panics
    /// Panics if Ditto has been closed.
    pub fn subscribe(&self) -> Subscription {
        let ditto = self.ditto.try_upgrade().unwrap();
        Subscription::new(
            ditto,
            self.collection_name.clone(),
            self.query.to_str(),
            self.query_args.clone(),
            &self.order_by,
            self.limit,
            self.offset,
        )
    }

    /// Update the document with the matching ID.
    /// * `updater` - a Fn which will be called on _all_ matching documents
    ///
    /// Note that fetching the documents occurs in one transaction and then
    /// applying `updater` to _all_ fetched documents occurs in a single,
    /// second transaction.
    pub fn update<Updater>(
        &self,
        updater: Updater,
    ) -> Result<BTreeMap<DocumentId, Vec<UpdateResult>>, DittoError>
    where
        Updater: Fn(&mut [BoxedDocument]),
    {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()? };

        let mut docs = self.exec_internal(Some(&mut write_txn))?;

        // Apply the closure to the document,
        updater(&mut docs);
        let diff = BTreeMap::<DocumentId, Vec<UpdateResult>>::new();

        let code = unsafe {
            ffi_sdk::ditto_collection_update_multiple(
                &ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                docs.into(),
            )
        };
        if code != 0 {
            unsafe { ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn) };
            return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
        }
        unsafe { ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn) };
        Ok(diff)
    }

    /// Limit the number of documents that get returned when querying a
    /// collection for matching documents.
    pub fn limit(&mut self, limit: i32) -> &mut Self {
        self.limit = limit;
        self
    }

    /// Offset the resulting set of matching documents. This is useful if you
    /// aren’t interested in the first N matching documents for one reason
    /// or another. For example, you might already have queried the
    /// collection and obtained the first 20 matching documents and so you might
    /// want to run the same query as you did previously but ignore the first 20
    /// matching documents, and that is where you would use offset.
    pub fn offset(&mut self, offset: u32) -> &mut Self {
        self.offset = offset;
        self
    }

    /// Remove all documents that match the query generated by the preceding
    /// function chaining. Returns the IDs of all documents removed
    pub fn remove(&self) -> Result<Vec<DocumentId>, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()? };

        let ids = unsafe {
            ffi_sdk::ditto_collection_remove_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
            .ok_or(ErrorKind::InvalidInput)?
        };
        unsafe {
            ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        }
        Ok(ids
            .to::<Vec<_>>()
            .into_iter()
            .map(|s| s.to::<Box<[u8]>>().into())
            .collect())
    }

    /// Evict all documents that match the query generated by the preceding
    /// function chaining.
    pub fn evict(&self) -> Result<Vec<DocumentId>, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = unsafe { ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()? };

        let ids = unsafe {
            ffi_sdk::ditto_collection_evict_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
            .ok_or(ErrorKind::InvalidInput)?
        };
        unsafe {
            ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        }
        Ok(ids
            .to::<Vec<_>>()
            .into_iter()
            .map(|s| s.to::<Box<[u8]>>().into())
            .collect())
    }

    /// Enables you to listen for changes that occur on a collection. This won’t
    /// subscribe to receive changes made remotely by others and so it will only
    /// fire updates when a local change is made. If you want to receive
    /// remotely performed updates as well then you need to also call subscribe
    /// with the relevant query. The returned DittoLiveQuery object must be kept
    /// in scope for as long as you want the provided eventHandler to be called
    /// when an update occurs.
    pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
    where
        Handler: EventHandler,
    {
        LiveQuery::with_handler(
            self.ditto.clone(),
            self.query.clone(),
            self.query_args.clone(),
            self.collection_name.clone(),
            &self.order_by,
            self.limit,
            self.offset,
            None,
            handler,
        )
    }

    // FIXME: To bring this in line with the other SDKs this should accept a single
    // "order_by" expression, which should then be added to the `order_by` vec
    /// Sort the documents that match the query provided in the preceding
    /// find-like function call.
    pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
        self.order_by = sort_param;
        self
    }
}