Skip to main content

dittolive_ditto/store/
mod.rs

1//! Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
2//!
3//! The `Store` provides two interfaces for interacting with data: Ditto Query Language
4//! (DQL), and the legacy "Query Builder" API. Where possible, we recommend developing new
5//! functionality using DQL, as we will eventually deprecate Query Builder.
6//!
7//! - [See the `dql` module docs for examples of DQL queries in action][0]
8//!
9//! [`ditto.store()`]: crate::prelude::Ditto::store
10//! [0]: crate::dql
11
12use_prelude!();
13
14use std::{
15    ops::Deref,
16    sync::{
17        atomic::{self, AtomicU64},
18        RwLock, Weak,
19    },
20};
21
22use ffi_sdk::{FfiStoreObserver, FsComponent, WriteStrategyRs};
23
24pub mod attachment;
25mod document_id;
26mod observer;
27pub mod transactions;
28pub use document_id::DocumentId;
29
30#[deprecated(note = "Use `ditto.store().execute_v2(...)` or \
31                     `ditto.store().register_observer_v2(...)` instead")]
32#[allow(deprecated)] // Silence warnings within the module itself
33pub mod query_builder;
34pub use self::observer::{ChangeHandler, ChangeHandlerWithSignalNext, SignalNext, StoreObserver};
35#[doc(hidden)]
36#[allow(deprecated)]
37pub use self::query_builder::{batch, collection, collections, live_query, update};
38#[doc(hidden)]
39pub use crate::dql;
40use crate::{
41    disk_usage::DiskUsage,
42    ditto::DittoFields,
43    error::{DittoError, ErrorKind},
44    utils::{extension_traits::FfiResultIntoRustResult, SetArc},
45};
46
47// Legacy
48#[doc(hidden)]
49#[deprecated]
50#[rustfmt::skip]
51pub use self::attachment::{
52    self as ditto_attachment,
53    fetch_event as ditto_attachment_fetch_event,
54    fetcher as ditto_attachment_fetcher,
55    token as ditto_attachment_token,
56};
57use self::{ditto_attachment_fetcher::DittoAttachmentFetcherV2, dql::query_v2::IntoQuery};
58
59#[doc(hidden)]
60#[deprecated(note = "The experimental `timeseries` feature has been deprecated")]
61#[cfg(feature = "timeseries")]
62pub mod timeseries;
63
64#[doc(hidden)]
65#[allow(deprecated)]
66use collections::pending_collections_operation::PendingCollectionsOperation;
67
68use crate::dql::*;
69
70type CancelToken = u64;
71
72/// Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
73///
74/// [See the `store` module for guide-level docs and examples][0].
75///
76/// [`ditto.store()`]: crate::prelude::Ditto::store
77/// [0]: crate::store
78#[derive(Clone)]
79pub struct Store {
80    ditto: Arc<ffi_sdk::BoxedDitto>,
81    // FIXME(Daniel): unify this field with `.ditto`
82    weak_ditto_fields: Weak<DittoFields>,
83    disk_usage: Arc<DiskUsage>,
84    attachment_fetchers: Arc<RwLock<HashMap<CancelToken, (bool, DittoAttachmentFetcherV2)>>>,
85}
86
87impl Store {
88    pub(crate) fn new(
89        ditto: Arc<ffi_sdk::BoxedDitto>,
90        weak_ditto_fields: Weak<DittoFields>,
91    ) -> Self {
92        let disk_usage = Arc::new(DiskUsage::new(ditto.retain(), FsComponent::Store));
93        Self {
94            ditto,
95            weak_ditto_fields,
96            disk_usage,
97            attachment_fetchers: <_>::default(),
98        }
99    }
100
101    // Note this method's logic will be moved into the core ditto library
102    // in the future
103    fn validate_collection_name(name: &str) -> Result<(), DittoError> {
104        let mut result = Ok(());
105
106        if name.is_empty() {
107            result = Err(DittoError::new(
108                ErrorKind::InvalidInput,
109                String::from("Collection name can not be empty"),
110            ));
111        }
112
113        if name.split_whitespace().next().is_none() {
114            result = Err(DittoError::new(
115                ErrorKind::InvalidInput,
116                String::from("Collection name can not only contain whitespace"),
117            ));
118        }
119
120        result
121    }
122
123    /// Returns a [`Collection`] with the provided name.
124    /// A collection name is valid if :
125    /// * its length is less than 100
126    /// * it is not empty
127    /// * it does not contain the char '\0'
128    /// * it does not begin with "$TS_"
129    #[doc(hidden)]
130    #[deprecated(note = "Use `ditto.store().execute_v2(...)` or \
131                         `ditto.store().register_observer_v2(...)` instead")]
132    #[allow(deprecated)]
133    pub fn collection(&self, collection_name: &'_ str) -> Result<Collection, DittoError> {
134        Self::validate_collection_name(collection_name)?;
135        let c_name = char_p::new(collection_name);
136        let status = { ffi_sdk::ditto_collection(&*self.ditto, c_name.as_ref()) };
137        if status != 0 {
138            return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
139        }
140        Ok(Collection {
141            ditto: Arc::downgrade(&self.ditto),
142            collection_name: c_name,
143        })
144    }
145
146    /// Returns an object that lets you fetch or observe the collections in the
147    /// store.
148    #[doc(hidden)]
149    #[deprecated]
150    #[allow(deprecated)]
151    pub fn collections(&self) -> PendingCollectionsOperation<'_> {
152        PendingCollectionsOperation::<'_>::new(Arc::downgrade(&self.ditto))
153    }
154
155    /// Allows you to group multiple operations together that affect multiple
156    /// documents, potentially across multiple collections, without
157    /// auto-committing on each operation.
158    ///
159    /// At the end of the batch of operations, either
160    /// [`batch.commit_changes`](crate::store::batch::ScopedStore::commit_changes)
161    /// or
162    /// [`batch.revert_changes`](crate::store::batch::ScopedStore::revert_changes)
163    /// must be called.
164    ///
165    /// ## Example
166    ///
167    /// ```rust
168    /// # macro_rules! ignore {($($__:tt)*) => ()} ignore! {
169    /// ditto.store().with_batched_write(|batch| {
170    ///     let mut foo_coll = batch.collection("foo");
171    ///     foo_coll.find...().remove();
172    ///     let mut bar_coll = batch.collection("bar");
173    ///     // Expensive multi-mutation op:
174    ///     for _ in 0 .. 10_000 {
175    ///         let doc = ...;
176    ///         bar_coll.insert(doc, None, false);
177    ///     }
178    ///     // At this point, we must say whether we commit or revert
179    ///     // these changes:
180    ///     batch.commit_changes()
181    /// })
182    /// # }
183    /// ```
184    #[doc(hidden)]
185    #[deprecated]
186    #[allow(deprecated)]
187    pub fn with_batched_write<F>(
188        &self,
189        f: F,
190    ) -> Result<Vec<batch::WriteTransactionResult>, DittoError>
191    where
192        for<'batch> F: FnOnce(batch::ScopedStore<'batch>) -> batch::Action<'batch>,
193    {
194        query_builder::batch::with_batched_write(self, f)
195    }
196
197    /// Returns a list of the names of collections in the local store.
198    pub fn collection_names(&self) -> Result<Vec<String>, DittoError> {
199        let c_collections = { ffi_sdk::ditto_get_collection_names(&*self.ditto).ok()? };
200
201        Ok(c_collections
202            .iter()
203            .map(|x: &char_p::Box| -> String { x.clone().into_string() })
204            .collect())
205    }
206
207    /// Returns a hash representing the current version of the given queries.
208    /// When a document matching such queries gets mutated, the hash will change
209    /// as well.
210    ///
211    /// Please note that the hash depends on how queries are constructed, so you
212    /// should make sure to always compare hashes generated with the same set of
213    /// queries.
214    #[doc(hidden)]
215    #[deprecated]
216    #[allow(deprecated)]
217    pub fn queries_hash(&self, live_queries: &[LiveQuery]) -> Result<u64, DittoError> {
218        let (coll_names, queries): (Vec<_>, Vec<_>) = live_queries
219            .iter()
220            .map(|lq| (lq.collection_name.as_ref(), lq.query.as_ref()))
221            .unzip();
222
223        {
224            ffi_sdk::ditto_queries_hash(&self.ditto, coll_names[..].into(), queries[..].into()).ok()
225        }
226    }
227
228    /// Returns a sequence of English words representing the current version of
229    /// the given queries. When a document matching such queries gets mutated,
230    /// the words will change as well.
231    ///
232    /// Please note that the resulting sequence of words depends on how queries
233    /// are constructed, so you should make sure to always compare hashes
234    /// generated with the same set of queries.
235    #[doc(hidden)]
236    #[deprecated]
237    #[allow(deprecated)]
238    pub fn queries_hash_mnemonic(&self, live_queries: &[LiveQuery]) -> Result<String, DittoError> {
239        let (coll_names, queries): (Vec<_>, Vec<_>) = live_queries
240            .iter()
241            .map(|lq| (lq.collection_name.as_ref(), lq.query.as_ref()))
242            .unzip();
243
244        {
245            ffi_sdk::ditto_queries_hash_mnemonic(
246                &self.ditto,
247                coll_names[..].into(),
248                queries[..].into(),
249            )
250            .ok()
251            .map(|c_str| c_str.into_string())
252        }
253    }
254
255    #[doc(hidden)]
256    #[deprecated(note = "All live query webhook related methods have been deprecated")]
257    /// Start all live query webhooks.
258    pub fn start_all_live_query_webhooks(&self) -> Result<(), DittoError> {
259        error!("Store::start_all_live_query_webhooks has been deprecated: it is now a no-op");
260        Err(DittoError::from_str(
261            ErrorKind::Deprecation,
262            "Live query webhooks are deprecated",
263        ))
264    }
265
266    #[doc(hidden)]
267    #[deprecated(note = "All live query webhook related methods have been deprecated")]
268    /// Start a live query webhooks by its id.
269    pub fn start_live_query_webhook_by_id(&self, _doc_id: DocumentId) -> Result<(), DittoError> {
270        error!("Store::start_live_query_webhook_by_id has been deprecated: it is now a no-op");
271        Err(DittoError::from_str(
272            ErrorKind::Deprecation,
273            "Live query webhooks are deprecated",
274        ))
275    }
276
277    #[doc(hidden)]
278    #[deprecated(note = "All live query webhook related methods have been deprecated")]
279    /// Register a new live query webhook
280    pub fn register_live_query_webhook(
281        &self,
282        _collection_name: &str,
283        _query: &str,
284        _url: &str,
285    ) -> Result<DocumentId, DittoError> {
286        error!("Store::register_live_query_webhook has been deprecated: it is now a no-op");
287        Err(DittoError::from_str(
288            ErrorKind::Deprecation,
289            "Live query webhooks are deprecated",
290        ))
291    }
292
293    #[doc(hidden)]
294    #[deprecated(note = "All live query webhook related methods have been deprecated")]
295    /// Generate a new API secret for live query webhook
296    pub fn live_query_webhook_generate_new_api_secret(&self) -> Result<(), DittoError> {
297        error!(
298            "Store::live_query_webhook_generate_new_api_secret has been deprecated: it is now a \
299             no-op"
300        );
301        Err(DittoError::from_str(
302            ErrorKind::Deprecation,
303            "Live query webhooks are deprecated",
304        ))
305    }
306
307    #[doc(hidden)]
308    #[deprecated(note = "The experimental `timeseries` feature has been deprecated")]
309    #[allow(deprecated)]
310    #[cfg(feature = "timeseries")]
311    /// Returns a [`TimeSeries`] with the provided name.
312    pub fn timeseries(&self, ts_name: &'_ str) -> Result<TimeSeries, DittoError> {
313        Self::validate_collection_name(ts_name)?;
314        let c_name = char_p::new(ts_name);
315        Ok(TimeSeries {
316            ditto: self.ditto.retain(),
317            ts_name: c_name,
318        })
319    }
320
321    /// Return a [`DiskUsage`] to monitor the disk usage of the
322    /// [`Store`].
323    #[doc(hidden)]
324    #[deprecated(note = "Use `ditto.disk_usage()` instead")]
325    pub fn disk_usage(&self) -> &DiskUsage {
326        &self.disk_usage
327    }
328
329    /// Deprecated in favour of [`ditto.store().register_observer_with_signal_next_v2()`][0]
330    ///
331    /// [0]: Store::register_observer_with_signal_next_v2
332    #[deprecated = "Use `ditto.store().register_observer_v2()` instead"]
333    #[allow(deprecated)]
334    #[doc(hidden)]
335    pub fn register_observer<Q, F>(
336        &self,
337        query: Q,
338        query_args: Option<QueryArguments>,
339        on_change: F,
340    ) -> Result<Arc<StoreObserver>, DittoError>
341    where
342        Q: TryInto<Query, Error = DittoError>,
343        F: ChangeHandler,
344    {
345        let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
346
347        let query = query.try_into()?;
348        let query_args = query_args.as_ref().map(|a| a.cbor());
349
350        let observer = Arc::new(StoreObserver::new(
351            &ditto,
352            &query.inner_string,
353            query_args,
354            on_change,
355        )?);
356        Ok(observer)
357    }
358
359    /// Deprecated in favour of [`ditto.store().register_observer_with_signal_next_v2()`][0]
360    ///
361    /// [0]: Store::register_observer_with_signal_next_v2
362    #[deprecated = "Use `ditto.store().register_observer_with_signal_next_v2()` instead"]
363    #[allow(deprecated)]
364    #[doc(hidden)]
365    pub fn register_observer_with_signal_next<Q, F>(
366        &self,
367        query: Q,
368        query_args: Option<QueryArguments>,
369        on_change: F,
370    ) -> Result<Arc<StoreObserver>, DittoError>
371    where
372        Q: TryInto<Query, Error = DittoError>,
373        F: ChangeHandlerWithSignalNext,
374    {
375        let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
376
377        let query = query.try_into()?;
378        let query_args = query_args.as_ref().map(|a| a.cbor());
379
380        let new_obs = Arc::new(StoreObserver::with_signal_next(
381            &ditto,
382            &query.inner_string,
383            query_args,
384            on_change,
385        )?);
386        Ok(new_obs)
387    }
388
389    /// Installs and returns a store observer for a query, configuring Ditto to
390    /// trigger the passed in change handler whenever documents in the local
391    /// store change such that the result of the matching query changes. The
392    /// passed in query must be a `SELECT` query, otherwise an error will be
393    /// returned.
394    ///
395    /// # Example
396    ///
397    /// ```
398    /// use dittolive_ditto::prelude::*;
399    /// # fn example_with_register_observer(ditto: &Ditto) -> anyhow::Result<()> {
400    ///
401    /// let store = ditto.store();
402    /// let _observer = store.register_observer_v2(
403    ///     "SELECT * FROM cars WHERE color = 'blue'",
404    ///     move |query_result| {
405    ///         for item in query_result.iter() {
406    ///             // ... handle each item
407    ///         }
408    ///     },
409    /// )?;
410    /// # Ok(())
411    /// # }
412    /// ```
413    ///
414    /// The first invocation of the change handler will always happen after
415    /// this method has returned.
416    ///
417    /// The observer will remain active until:
418    ///
419    /// - the [`StoreObserver`] handle gets dropped,
420    /// - the [`observer.cancel()`] method is called, or
421    /// - the owning [`Ditto`] instance has shut down
422    ///
423    /// Observer callbacks will never be called concurrently. That is, one callback
424    /// must return before the observer will call the handler again. See
425    /// [`ditto.store().register_observer_with_signal_next(...)`] if you want
426    /// to manually signal readiness for the next callback.
427    ///
428    /// [`ditto.store().register_observer_with_signal_next(...)`]: crate::store::Store::register_observer_with_signal_next
429    /// [`observer.cancel()`]: crate::store::StoreObserver::cancel
430    /// [`Ditto`]: crate::prelude::Ditto
431    pub fn register_observer_v2<Q, F>(
432        &self,
433        query: Q,
434        on_change: F,
435    ) -> Result<Arc<StoreObserver>, DittoError>
436    where
437        Q: IntoQuery,
438        Q::Args: Serialize,
439        F: ChangeHandler,
440    {
441        let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
442        let query = query.into_query()?;
443
444        let observer = Arc::new(StoreObserver::new(
445            &ditto,
446            &query.string,
447            query.args_cbor.as_deref(),
448            on_change,
449        )?);
450        Ok(observer)
451    }
452
453    /// Installs and returns a store observer for a query, configuring Ditto to
454    /// trigger the passed in change handler whenever documents in the local
455    /// store change such that the result of the matching query changes. The
456    /// passed in query must be a `SELECT` query, otherwise an error will be
457    /// returned.
458    ///
459    /// Here, a function is passed as an additional argument to the change
460    /// handler. This allows the change handler to control how frequently
461    /// it is called. See [`register_observer`] for a convenience method that
462    /// automatically signals the next invocation.
463    ///
464    /// # Example
465    ///
466    /// ```
467    /// use dittolive_ditto::prelude::*;
468    /// # fn example_with_signal_next(ditto: &Ditto) -> anyhow::Result<()> {
469    ///
470    /// let store = ditto.store();
471    /// let _observer = store.register_observer_with_signal_next_v2(
472    ///     "SELECT * FROM cars WHERE color = 'blue'",
473    ///     move |query_result, signal_next| {
474    ///         for item in query_result.iter() {
475    ///             // ... handle each item
476    ///         }
477    ///
478    ///         // Call `signal_next` when you're ready for the next callback
479    ///         signal_next();
480    ///     },
481    /// )?;
482    /// # Ok(())
483    /// # }
484    /// ```
485    ///
486    /// The first invocation of the change handler will always happen after
487    /// this method has returned.
488    ///
489    /// The observer will remain active until:
490    ///
491    /// - the [`StoreObserver`] handle gets dropped,
492    /// - the [`observer.cancel()`] method is called, or
493    /// - the owning [`Ditto`] instance has shut down
494    ///
495    /// After invoking the callback once, the observer will wait to deliver
496    /// another callback until after you've called [`signal_next`].
497    ///
498    /// [`register_observer`]: Store::register_observer
499    /// [`signal_next`]: crate::store::SignalNext
500    pub fn register_observer_with_signal_next_v2<Q, F>(
501        &self,
502        query: Q,
503        on_change: F,
504    ) -> Result<Arc<StoreObserver>, DittoError>
505    where
506        Q: IntoQuery,
507        Q::Args: Serialize,
508        F: ChangeHandlerWithSignalNext,
509    {
510        let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
511        let query = query.into_query()?;
512
513        let new_obs = Arc::new(StoreObserver::with_signal_next(
514            &ditto,
515            &query.string,
516            query.args_cbor.as_deref(),
517            on_change,
518        )?);
519        Ok(new_obs)
520    }
521
522    /// Gets temporary access to the set of currently registered observers.
523    ///
524    /// A (read) lock is held until the return value is dropped: this means
525    /// that neither [`Self::register_observer()`] nor
526    /// [`StoreObserver::cancel()`] can make progress until this read
527    /// lock is released.
528    pub fn observers(&self) -> impl '_ + Deref<Target = SetArc<StoreObserver>> {
529        let observers: repr_c::Vec<repr_c::Box<FfiStoreObserver>> =
530            ffi_sdk::dittoffi_store_observers(&self.ditto);
531        let observers: Vec<_> = observers.into();
532        let observers = observers
533            .into_iter()
534            .map(|handle: repr_c::Box<FfiStoreObserver>| Arc::new(StoreObserver { handle }))
535            .collect::<SetArc<_>>();
536
537        Box::new(observers)
538    }
539
540    /// Executes the given query in the local store and returns the result.
541    ///
542    /// Use placeholders to incorporate values from the optional `query_args`
543    /// parameter into the query. The keys of the [`QueryArguments`] object must
544    /// match the placeholders used within the query. You can not use placeholders
545    /// in the `FROM` clause.
546    ///
547    /// This method only returns results from the local store without waiting for any
548    /// [`SyncSubscription`]s to have caught up with the
549    /// latest changes. Only use this method if your program must proceed with immediate results.
550    ///
551    /// Use [`ditto.store().register_observer(...)`] to receive updates to query results
552    /// as soon as they have been synced to this peer.
553    ///
554    /// Limitations:
555    ///
556    /// - Supports `SELECT * FROM <collection name>` with optional `WHERE <expression>`
557    /// - No transactions
558    ///
559    /// [`SyncSubscription`]: crate::sync::SyncSubscription
560    /// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
561    #[deprecated = "Use `ditto.store().execute_v2(...)` instead"]
562    #[allow(deprecated)]
563    #[doc(hidden)]
564    pub async fn execute<Q>(
565        &self,
566        query: Q,
567        query_args: Option<QueryArguments>,
568    ) -> Result<QueryResult, DittoError>
569    where
570        Q: TryInto<Query, Error = DittoError>,
571    {
572        // Get a DqlResult. The operation to get it is fallible, so we end up
573        // with a `FfiResult<DqlResult> -> Result<DqlResult> -> DqlResult`.
574        // The `Result` stutter is quite unfortunate, which _results_ from
575        // having picked that name in our public SDK API (over, say, `Output`).
576        let ffi_query_result = ffi_sdk::dittoffi_try_exec_statement(
577            &self.ditto,
578            query.try_into()?.prepare_ffi(),
579            query_args.as_ref().map(|qa| qa.cbor().into()),
580        )
581        .into_rust_result()?;
582
583        Ok(QueryResult::from(ffi_query_result))
584    }
585
586    /// Executes the given query in the local store and returns the result.
587    ///
588    /// # Example
589    ///
590    /// ```
591    /// use dittolive_ditto::prelude::*;
592    /// use dittolive_ditto::dql::QueryResult;
593    /// # async fn example(ditto: &Ditto) -> anyhow::Result<()> {
594    ///
595    /// // Query a collection
596    /// let result = ditto.store().execute_v2("SELECT * FROM cars").await?;
597    ///
598    /// // Insert a document into a collection
599    /// let insert_result: QueryResult = ditto
600    ///     .store()
601    ///     .execute_v2((
602    ///          "INSERT INTO cars DOCUMENTS (:newCar)",
603    ///          serde_json::json!({
604    ///              "newCar": {
605    ///                  "make": "ford",
606    ///                  "color": "blue"
607    ///              }
608    ///          })
609    ///     ))
610    ///     .await?;
611    /// # Ok(())
612    /// # }
613    /// ```
614    ///
615    /// Use placeholders to incorporate values from the optional `query_args`
616    /// parameter into the query. The keys of the [`QueryArguments`] object must
617    /// match the placeholders used within the query. You can not use placeholders
618    /// in the `FROM` clause.
619    ///
620    /// This method only returns results from the local store without waiting for any
621    /// [`SyncSubscription`]s to have caught up with the
622    /// latest changes. Only use this method if your program must proceed with immediate results.
623    ///
624    /// Use [`ditto.store().register_observer(...)`] to receive updates to query results
625    /// as soon as they have been synced to this peer.
626    ///
627    /// ## Query parameter
628    ///
629    /// The `query` parameter must implement [`IntoQuery`], which is a trait that is implemented by
630    /// objects that can be turned into a query string along with the relevant query argumnents.
631    ///
632    /// For queries with no arguments, a [`String`] is sufficient
633    ///
634    /// [`SyncSubscription`]: crate::sync::SyncSubscription
635    /// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
636    pub async fn execute_v2<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
637    where
638        Q: IntoQuery,
639        Q::Args: serde::Serialize,
640    {
641        let query = query.into_query()?;
642        let query_string = (&*query.string).into();
643        let query_args = query.args_cbor.as_deref().map(Into::into);
644
645        let ffi_query_result =
646            ffi_sdk::dittoffi_try_exec_statement(&self.ditto, query_string, query_args)
647                .into_rust_result()?;
648
649        Ok(QueryResult::from(ffi_query_result))
650    }
651
652    /// Creates a new attachment, which can then be inserted into a document.
653    ///
654    /// The file residing at the provided path will be copied into Ditto’s store. The
655    /// [`DittoAttachment`] object that is returned is what you can
656    /// then use to insert an attachment into a document.
657    ///
658    /// You can provide custom user data about the attachment, which will be replicated to other
659    /// peers alongside the file attachment.
660    pub async fn new_attachment(
661        &self,
662        filepath: &(impl ?Sized + AsRef<Path>),
663        user_data: HashMap<String, String>,
664    ) -> Result<DittoAttachment, DittoError> {
665        DittoAttachment::from_file_and_metadata(filepath, user_data, &self.ditto)
666    }
667
668    /// Creates a new attachment from in-memory data
669    ///
670    /// Refer to [`new_attachment`](Self::new_attachment) for additional information.
671    pub async fn new_attachment_from_bytes(
672        &self,
673        bytes: &(impl ?Sized + AsRef<[u8]>),
674        user_data: HashMap<String, String>,
675    ) -> Result<DittoAttachment, DittoError> {
676        DittoAttachment::from_bytes_and_metadata(bytes, user_data, &self.ditto)
677    }
678
679    /// Fetches the attachment corresponding to the provided attachment token.
680    /// - `attachment_token`: can be either a [`DittoAttachmentToken`], or a `&BTreeMap<CborValue,
681    ///   CborValue>`, that is, the output of a [`QueryResultItem::value()`] once casted
682    ///   [`.as_object()`][crate::prelude::CborValueGetters::as_object()].
683    ///
684    /// - `on_fetch_event`: A closure that will be called when the status of the request to fetch
685    ///   the attachment has changed. If the attachment is already available then this will be
686    ///   called almost immediately with a completed status value.
687    ///
688    /// The returned [`DittoAttachmentFetcher`] is a handle which is safe to discard, unless you
689    /// wish to be able to [`.cancel()`][DittoAttachmentFetcher::cancel] the fetching operation.
690    /// When not explicitly cancelled, the fetching operation will remain active until it either
691    /// completes, the attachment is deleted, or the owning [`Ditto`] object is dropped.
692    pub fn fetch_attachment(
693        &self,
694        attachment_token: impl attachment::token::DittoAttachmentTokenLike,
695        on_fetch_event: impl 'static + Send + Sync + Fn(DittoAttachmentFetchEvent),
696    ) -> Result<DittoAttachmentFetcherV2, DittoError> {
697        let attachment_token = attachment_token.parse_attachment_token()?;
698
699        let weak_ditto = self.weak_ditto_fields.clone();
700        let ditto = weak_ditto
701            .upgrade()
702            .ok_or(ErrorKind::ReleasedDittoInstance)?;
703
704        let mut attachment_fetchers_lockguard = self.attachment_fetchers.write().unwrap();
705        let fetcher = DittoAttachmentFetcher::new(
706            attachment_token,
707            Some(&ditto),
708            &self.ditto,
709            // Shim around `on_fetch_event` to `cancel` on completion.
710            move |event, cancel_token: &AtomicU64| {
711                let has_finished = matches! {
712                    event,
713                    | DittoAttachmentFetchEvent::Completed { .. }
714                    | DittoAttachmentFetchEvent::Deleted { .. }
715                };
716                on_fetch_event(event);
717                if has_finished {
718                    if let Some(ditto) = weak_ditto.upgrade() {
719                        let mut attachment_fetchers_inner_lockguard =
720                            ditto.store.attachment_fetchers.write().unwrap();
721                        // Relaxed is fine thanks to the lock.
722                        let cancel_token = cancel_token.load(atomic::Ordering::Relaxed);
723                        ditto.store.unregister_fetcher(
724                            cancel_token,
725                            Some(&mut *attachment_fetchers_inner_lockguard),
726                        );
727                    }
728                }
729            },
730        )?;
731        let (cancel_token, was_zero) = fetcher.cancel_token_ensure_unique();
732        attachment_fetchers_lockguard.insert(cancel_token, (was_zero, fetcher.clone()));
733        Ok(fetcher)
734    }
735
736    fn unregister_fetcher(
737        &self,
738        mut fetcher_cancel_token: CancelToken,
739        fetchers: Option<&mut HashMap<CancelToken, (bool, DittoAttachmentFetcherV2)>>,
740    ) -> bool {
741        let mut lock_guard = None;
742        let fetchers = fetchers.unwrap_or_else(|| {
743            &mut **lock_guard.get_or_insert(self.attachment_fetchers.write().unwrap())
744        });
745
746        let Some((was_zero, removed_fetcher)) = fetchers.remove(&fetcher_cancel_token) else {
747            return false;
748        };
749        drop(lock_guard);
750
751        if was_zero {
752            fetcher_cancel_token = 0;
753        }
754
755        let att_token = &removed_fetcher.context.token;
756        debug!(
757            token_id = %att_token.id(),
758            %fetcher_cancel_token,
759            "unregistering ditto attachment fetcher"
760        );
761
762        let status = ffi_sdk::ditto_cancel_resolve_attachment(
763            &self.ditto,
764            att_token.id.as_ref().into(),
765            fetcher_cancel_token,
766        );
767
768        if status != 0 {
769            error!(
770                token_id = %att_token.id(),
771                %fetcher_cancel_token,
772                "failed to clean up attachment fetcher"
773            );
774        }
775        status == 0
776    }
777
778    /// Gets a copy of the set of currently registered attachment fetchers.
779    ///
780    /// A (read) lock is held during the copy: this contends with [`Self::fetch_attachment()`] and
781    /// with [`DittoAttachmentFetcher::cancel()`].
782    pub fn attachment_fetchers(&self) -> Vec<DittoAttachmentFetcherV2> {
783        self.attachment_fetchers
784            .read()
785            .unwrap()
786            .iter()
787            .map(|(_, (_, fetcher))| fetcher.clone())
788            .collect()
789    }
790}
791
792/// Specify the order of returned Documents in a query.
793#[non_exhaustive]
794#[derive(Clone, Copy, PartialEq, Eq, Debug)]
795pub enum SortDirection {
796    /// First result is "smallest", last result is "largest"
797    Ascending,
798
799    /// First result is "largest", last result is "smallest"
800    Descending,
801}
802
803#[non_exhaustive]
804#[derive(Clone, Copy, PartialEq, Eq, Debug)]
805/// Specify the write strategy when inserting documents.
806pub enum WriteStrategy {
807    /// An existing document will be merged with the document being inserted, if there is a
808    /// pre-existing document.
809    Merge,
810
811    /// Insert the document only if there is not already a document with the same Id in the store.
812    /// If there is already a document in the store with the same Id then this will be a no-op.
813    InsertIfAbsent,
814
815    /// Insert the document, with its contents treated as default data, only if there is not
816    /// already a document with the same Id in the store. If there is already a document in the
817    /// store with the same Id then this will be a no-op. Use this strategy if you want to
818    /// insert default data for a given document Id, which you want to treat as common initial
819    /// data amongst all peers and that you expect to be mutated or overwritten in due course.
820    InsertDefaultIfAbsent,
821}
822
823impl WriteStrategy {
824    fn as_write_strategy_rs(&self) -> WriteStrategyRs {
825        match self {
826            WriteStrategy::Merge => WriteStrategyRs::Merge,
827            WriteStrategy::InsertIfAbsent => WriteStrategyRs::InsertIfAbsent,
828            WriteStrategy::InsertDefaultIfAbsent => WriteStrategyRs::InsertDefaultIfAbsent,
829        }
830    }
831}