dittolive-ditto 4.2.2

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
use_prelude!();

use std::collections::BTreeMap;

use ffi_sdk::COrderByParam;

use crate::{
    ditto::{TryUpgrade, WeakDittoHandleWrapper},
    error::{DittoError, ErrorKind},
    store::{collection::document_id::DocumentId, live_query::LiveQuery, update::UpdateResult},
    subscription::Subscription,
};

/// These objects are returned when using `find`-like functionality on `Collection`s.
///
/// They allow chaining of further query-related functions to do things like add a limit to the
/// number of documents you want returned or specify how you want the documents to be sorted and
/// ordered. You can either call `exec` on the object to get an array of `Document`s as an
/// immediate return value, or you can establish either a live query or a subscription, which both
/// work over time.
///
/// A live query, established by calling `observe_local`, will notify you every time there's an
/// update to a document that matches the query you provided in the preceding `find`-like call.
///
/// A subscription, established by calling `subscribe`, will act as a signal to other peers that
/// the device connects to that you would like to receive updates from them about documents
/// that match the query you provided in the preceding `find`-like call.
///
/// Typically, an app would set up a `subscribe` in some part of the application which is long-lived
/// to ensure the device receives updates from the mesh. These updates will be automatically
/// received and written into the local store. Elsewhere, where you need to use this data, an
/// `observeLocal` can be used to notify you of the data, and all subsequent changes to the data.
///
/// Update and remove functionality is also exposed through this object.
pub struct PendingCursorOperation<'order_by> {
    pub(super) ditto: WeakDittoHandleWrapper,
    pub(super) collection_name: char_p::Box,
    pub(super) query: char_p::Box,
    pub(super) query_args: Option<Vec<u8>>,
    pub(super) offset: u32,
    pub(super) limit: i32,
    pub(super) order_by: Vec<COrderByParam<'order_by>>,
}

impl<'order_by> PendingCursorOperation<'order_by> {
    // TODO(pub_check)
    pub fn new(
        ditto: WeakDittoHandleWrapper,
        collection_name: char_p::Box,
        query: &str,
        query_args: Option<Vec<u8>>,
    ) -> Self {
        let query = char_p::new(query);
        Self {
            ditto,
            collection_name,
            query,
            query_args,
            offset: 0,
            limit: -1,
            order_by: vec![],
        }
    }

    /// Execute the query generated by the preceding function chaining and
    /// return the list of matching documents. This occurs immediately.
    pub fn exec(&self) -> Result<Vec<BoxedDocument>, DittoError> {
        self.exec_internal(None)
    }

    fn exec_internal(
        &self,
        txn: Option<&'_ mut ffi_sdk::CWriteTransaction>,
    ) -> Result<Vec<BoxedDocument>, DittoError> {
        let ditto = self.ditto.try_upgrade()?;
        {
            ffi_sdk::ditto_collection_exec_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
        }
        .ok_or(ErrorKind::InvalidInput)
        .map(|it| it.into())
    }

    /// Enables you to subscribe to changes that occur on a collection.
    ///
    /// Having a subscription acts as a signal to others that you are interested in receiving
    /// updates when local or remote changes are made to documents that match the query generated by
    /// the chain of operations that precedes the call to subscribe. The returned
    /// [`Subscription`](crate::prelude::Subscription) object must be kept in scope for as long as
    /// you want to keep receiving updates.
    ///
    /// # Panics
    /// Panics if Ditto has been closed.
    pub fn subscribe(&self) -> Subscription {
        let ditto = self.ditto.try_upgrade().unwrap();
        Subscription::new(
            ditto,
            self.collection_name.clone(),
            self.query.to_str(),
            self.query_args.clone(),
            &self.order_by,
            self.limit,
            self.offset,
        )
    }

    /// Update the document with the matching document Id.
    ///
    /// - `updater`: am `Fn` which will be called on _all_ matching documents.
    pub fn update<Updater>(
        &self,
        updater: Updater,
    ) -> Result<BTreeMap<DocumentId, Vec<UpdateResult>>, DittoError>
    where
        Updater: Fn(&mut [BoxedDocument]),
    {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;

        let mut docs = self.exec_internal(Some(&mut write_txn))?;

        // Apply the closure to the document,
        updater(&mut docs);
        let diff = BTreeMap::<DocumentId, Vec<UpdateResult>>::new();

        let code = {
            ffi_sdk::ditto_collection_update_multiple(
                &ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                docs.into(),
            )
        };
        if code != 0 {
            ffi_sdk::ditto_write_transaction_rollback(&ditto, write_txn);
            return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
        }
        ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        Ok(diff)
    }

    /// Limit the number of documents that get returned when querying a
    /// collection for matching documents.
    pub fn limit(&mut self, limit: i32) -> &mut Self {
        self.limit = limit;
        self
    }

    /// Offset the resulting set of matching documents. This is useful if you
    /// aren’t interested in the first N matching documents for one reason
    /// or another. For example, you might already have queried the
    /// collection and obtained the first 20 matching documents and so you might
    /// want to run the same query as you did previously but ignore the first 20
    /// matching documents, and that is where you would use offset.
    pub fn offset(&mut self, offset: u32) -> &mut Self {
        self.offset = offset;
        self
    }

    /// Remove all documents that match the query generated by the preceding
    /// function chaining. Returns the IDs of all documents removed
    pub fn remove(&self) -> Result<Vec<DocumentId>, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;

        let ids = {
            ffi_sdk::ditto_collection_remove_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
            .ok_or(ErrorKind::InvalidInput)?
        };
        ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        Ok(ids
            .to::<Vec<_>>()
            .into_iter()
            .map(|s| s.to::<Box<[u8]>>().into())
            .collect())
    }

    /// Evict all documents that match the query generated by the preceding
    /// function chaining.
    pub fn evict(&self) -> Result<Vec<DocumentId>, DittoError> {
        let ditto = self
            .ditto
            .upgrade()
            .ok_or(crate::error::ErrorKind::ReleasedDittoInstance)?;
        let hint: Option<char_p::Ref<'_>> = None;
        let mut write_txn = ffi_sdk::ditto_write_transaction(&*ditto, hint).ok()?;

        let ids = {
            ffi_sdk::ditto_collection_evict_query_str(
                &*ditto,
                self.collection_name.as_ref(),
                &mut write_txn,
                self.query.as_ref(),
                self.query_args.as_ref().map(|qa| (&qa[..]).into()),
                (&self.order_by[..]).into(),
                self.limit,
                self.offset,
            )
            .ok_or(ErrorKind::InvalidInput)?
        };
        ffi_sdk::ditto_write_transaction_commit(&ditto, write_txn);
        Ok(ids
            .to::<Vec<_>>()
            .into_iter()
            .map(|s| s.to::<Box<[u8]>>().into())
            .collect())
    }

    /// Enables you to listen for changes that occur on a collection.
    ///
    /// This won’t subscribe to receive changes made remotely by others and so it will only fire
    /// updates when a local change is made. If you want to receive remotely performed updates as
    /// well then you need to also call [`subscribe`](PendingCursorOperation::subscribe) with the
    /// relevant query. The returned [`LiveQuery`](crate::prelude::LiveQuery) object must be kept in
    /// scope for as long as you want the provided `Handler` to be called when an update occurs.
    pub fn observe_local<Handler>(&self, handler: Handler) -> Result<LiveQuery, DittoError>
    where
        Handler: EventHandler,
    {
        LiveQuery::with_handler(
            self.ditto.clone(),
            self.query.clone(),
            self.query_args.clone(),
            self.collection_name.clone(),
            &self.order_by,
            self.limit,
            self.offset,
            handler,
        )
    }

    // FIXME: To bring this in line with the other SDKs this should accept a single
    // "order_by" expression, which should then be added to the `order_by` vec
    /// Sort the documents that match the query provided in the preceding
    /// find-like function call.
    pub fn sort(&mut self, sort_param: Vec<COrderByParam<'order_by>>) -> &mut Self {
        self.order_by = sort_param;
        self
    }
}