dittolive_ditto/store/mod.rs
1//! Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
2//!
3//! The `Store` provides two interfaces for interacting with data: Ditto Query Language
4//! (DQL), and the legacy "Query Builder" API. Where possible, we recommend developing new
5//! functionality using DQL, as we will eventually deprecate Query Builder.
6//!
7//! - [See the `dql` module docs for examples of DQL queries in action][0]
8//!
9//! [`ditto.store()`]: crate::prelude::Ditto::store
10//! [0]: crate::dql
11
12use_prelude!();
13
14use std::{
15 ops::Deref,
16 sync::{
17 atomic::{self, AtomicU64},
18 RwLock, Weak,
19 },
20};
21
22use ffi_sdk::{FfiStoreObserver, FsComponent, WriteStrategyRs};
23
24pub mod attachment;
25mod document_id;
26mod observer;
27pub mod transactions;
28pub use document_id::DocumentId;
29
30#[deprecated(note = "Use `ditto.store().execute_v2(...)` or \
31 `ditto.store().register_observer_v2(...)` instead")]
32#[allow(deprecated)] // Silence warnings within the module itself
33pub mod query_builder;
34pub use self::observer::{ChangeHandler, ChangeHandlerWithSignalNext, SignalNext, StoreObserver};
35#[doc(hidden)]
36#[allow(deprecated)]
37pub use self::query_builder::{batch, collection, collections, live_query, update};
38#[doc(hidden)]
39pub use crate::dql;
40use crate::{
41 disk_usage::DiskUsage,
42 ditto::DittoFields,
43 error::{DittoError, ErrorKind},
44 utils::{extension_traits::FfiResultIntoRustResult, SetArc},
45};
46
47// Legacy
48#[doc(hidden)]
49#[deprecated]
50#[rustfmt::skip]
51pub use self::attachment::{
52 self as ditto_attachment,
53 fetch_event as ditto_attachment_fetch_event,
54 fetcher as ditto_attachment_fetcher,
55 token as ditto_attachment_token,
56};
57use self::{ditto_attachment_fetcher::DittoAttachmentFetcherV2, dql::query_v2::IntoQuery};
58
59#[doc(hidden)]
60#[deprecated(note = "The experimental `timeseries` feature has been deprecated")]
61#[cfg(feature = "timeseries")]
62pub mod timeseries;
63
64#[doc(hidden)]
65#[allow(deprecated)]
66use collections::pending_collections_operation::PendingCollectionsOperation;
67
68use crate::dql::*;
69
70type CancelToken = u64;
71
72/// Use [`ditto.store()`] to access the [`Store`] API to read, write, and remove documents.
73///
74/// [See the `store` module for guide-level docs and examples][0].
75///
76/// [`ditto.store()`]: crate::prelude::Ditto::store
77/// [0]: crate::store
78#[derive(Clone)]
79pub struct Store {
80 ditto: Arc<ffi_sdk::BoxedDitto>,
81 // FIXME(Daniel): unify this field with `.ditto`
82 weak_ditto_fields: Weak<DittoFields>,
83 disk_usage: Arc<DiskUsage>,
84 attachment_fetchers: Arc<RwLock<HashMap<CancelToken, (bool, DittoAttachmentFetcherV2)>>>,
85}
86
87impl Store {
88 pub(crate) fn new(
89 ditto: Arc<ffi_sdk::BoxedDitto>,
90 weak_ditto_fields: Weak<DittoFields>,
91 ) -> Self {
92 let disk_usage = Arc::new(DiskUsage::new(ditto.retain(), FsComponent::Store));
93 Self {
94 ditto,
95 weak_ditto_fields,
96 disk_usage,
97 attachment_fetchers: <_>::default(),
98 }
99 }
100
101 // Note this method's logic will be moved into the core ditto library
102 // in the future
103 fn validate_collection_name(name: &str) -> Result<(), DittoError> {
104 let mut result = Ok(());
105
106 if name.is_empty() {
107 result = Err(DittoError::new(
108 ErrorKind::InvalidInput,
109 String::from("Collection name can not be empty"),
110 ));
111 }
112
113 if name.split_whitespace().next().is_none() {
114 result = Err(DittoError::new(
115 ErrorKind::InvalidInput,
116 String::from("Collection name can not only contain whitespace"),
117 ));
118 }
119
120 result
121 }
122
123 /// Returns a [`Collection`] with the provided name.
124 /// A collection name is valid if :
125 /// * its length is less than 100
126 /// * it is not empty
127 /// * it does not contain the char '\0'
128 /// * it does not begin with "$TS_"
129 #[doc(hidden)]
130 #[deprecated(note = "Use `ditto.store().execute_v2(...)` or \
131 `ditto.store().register_observer_v2(...)` instead")]
132 #[allow(deprecated)]
133 pub fn collection(&self, collection_name: &'_ str) -> Result<Collection, DittoError> {
134 Self::validate_collection_name(collection_name)?;
135 let c_name = char_p::new(collection_name);
136 let status = { ffi_sdk::ditto_collection(&*self.ditto, c_name.as_ref()) };
137 if status != 0 {
138 return Err(DittoError::from_ffi(ErrorKind::InvalidInput));
139 }
140 Ok(Collection {
141 ditto: Arc::downgrade(&self.ditto),
142 collection_name: c_name,
143 })
144 }
145
146 /// Returns an object that lets you fetch or observe the collections in the
147 /// store.
148 #[doc(hidden)]
149 #[deprecated]
150 #[allow(deprecated)]
151 pub fn collections(&self) -> PendingCollectionsOperation<'_> {
152 PendingCollectionsOperation::<'_>::new(Arc::downgrade(&self.ditto))
153 }
154
155 /// Allows you to group multiple operations together that affect multiple
156 /// documents, potentially across multiple collections, without
157 /// auto-committing on each operation.
158 ///
159 /// At the end of the batch of operations, either
160 /// [`batch.commit_changes`](crate::store::batch::ScopedStore::commit_changes)
161 /// or
162 /// [`batch.revert_changes`](crate::store::batch::ScopedStore::revert_changes)
163 /// must be called.
164 ///
165 /// ## Example
166 ///
167 /// ```rust
168 /// # macro_rules! ignore {($($__:tt)*) => ()} ignore! {
169 /// ditto.store().with_batched_write(|batch| {
170 /// let mut foo_coll = batch.collection("foo");
171 /// foo_coll.find...().remove();
172 /// let mut bar_coll = batch.collection("bar");
173 /// // Expensive multi-mutation op:
174 /// for _ in 0 .. 10_000 {
175 /// let doc = ...;
176 /// bar_coll.insert(doc, None, false);
177 /// }
178 /// // At this point, we must say whether we commit or revert
179 /// // these changes:
180 /// batch.commit_changes()
181 /// })
182 /// # }
183 /// ```
184 #[doc(hidden)]
185 #[deprecated]
186 #[allow(deprecated)]
187 pub fn with_batched_write<F>(
188 &self,
189 f: F,
190 ) -> Result<Vec<batch::WriteTransactionResult>, DittoError>
191 where
192 for<'batch> F: FnOnce(batch::ScopedStore<'batch>) -> batch::Action<'batch>,
193 {
194 query_builder::batch::with_batched_write(self, f)
195 }
196
197 /// Returns a list of the names of collections in the local store.
198 pub fn collection_names(&self) -> Result<Vec<String>, DittoError> {
199 let c_collections = { ffi_sdk::ditto_get_collection_names(&*self.ditto).ok()? };
200
201 Ok(c_collections
202 .iter()
203 .map(|x: &char_p::Box| -> String { x.clone().into_string() })
204 .collect())
205 }
206
207 /// Returns a hash representing the current version of the given queries.
208 /// When a document matching such queries gets mutated, the hash will change
209 /// as well.
210 ///
211 /// Please note that the hash depends on how queries are constructed, so you
212 /// should make sure to always compare hashes generated with the same set of
213 /// queries.
214 #[doc(hidden)]
215 #[deprecated]
216 #[allow(deprecated)]
217 pub fn queries_hash(&self, live_queries: &[LiveQuery]) -> Result<u64, DittoError> {
218 let (coll_names, queries): (Vec<_>, Vec<_>) = live_queries
219 .iter()
220 .map(|lq| (lq.collection_name.as_ref(), lq.query.as_ref()))
221 .unzip();
222
223 {
224 ffi_sdk::ditto_queries_hash(&self.ditto, coll_names[..].into(), queries[..].into()).ok()
225 }
226 }
227
228 /// Returns a sequence of English words representing the current version of
229 /// the given queries. When a document matching such queries gets mutated,
230 /// the words will change as well.
231 ///
232 /// Please note that the resulting sequence of words depends on how queries
233 /// are constructed, so you should make sure to always compare hashes
234 /// generated with the same set of queries.
235 #[doc(hidden)]
236 #[deprecated]
237 #[allow(deprecated)]
238 pub fn queries_hash_mnemonic(&self, live_queries: &[LiveQuery]) -> Result<String, DittoError> {
239 let (coll_names, queries): (Vec<_>, Vec<_>) = live_queries
240 .iter()
241 .map(|lq| (lq.collection_name.as_ref(), lq.query.as_ref()))
242 .unzip();
243
244 {
245 ffi_sdk::ditto_queries_hash_mnemonic(
246 &self.ditto,
247 coll_names[..].into(),
248 queries[..].into(),
249 )
250 .ok()
251 .map(|c_str| c_str.into_string())
252 }
253 }
254
255 #[doc(hidden)]
256 #[deprecated(note = "All live query webhook related methods have been deprecated")]
257 /// Start all live query webhooks.
258 pub fn start_all_live_query_webhooks(&self) -> Result<(), DittoError> {
259 error!("Store::start_all_live_query_webhooks has been deprecated: it is now a no-op");
260 Err(DittoError::from_str(
261 ErrorKind::Deprecation,
262 "Live query webhooks are deprecated",
263 ))
264 }
265
266 #[doc(hidden)]
267 #[deprecated(note = "All live query webhook related methods have been deprecated")]
268 /// Start a live query webhooks by its id.
269 pub fn start_live_query_webhook_by_id(&self, _doc_id: DocumentId) -> Result<(), DittoError> {
270 error!("Store::start_live_query_webhook_by_id has been deprecated: it is now a no-op");
271 Err(DittoError::from_str(
272 ErrorKind::Deprecation,
273 "Live query webhooks are deprecated",
274 ))
275 }
276
277 #[doc(hidden)]
278 #[deprecated(note = "All live query webhook related methods have been deprecated")]
279 /// Register a new live query webhook
280 pub fn register_live_query_webhook(
281 &self,
282 _collection_name: &str,
283 _query: &str,
284 _url: &str,
285 ) -> Result<DocumentId, DittoError> {
286 error!("Store::register_live_query_webhook has been deprecated: it is now a no-op");
287 Err(DittoError::from_str(
288 ErrorKind::Deprecation,
289 "Live query webhooks are deprecated",
290 ))
291 }
292
293 #[doc(hidden)]
294 #[deprecated(note = "All live query webhook related methods have been deprecated")]
295 /// Generate a new API secret for live query webhook
296 pub fn live_query_webhook_generate_new_api_secret(&self) -> Result<(), DittoError> {
297 error!(
298 "Store::live_query_webhook_generate_new_api_secret has been deprecated: it is now a \
299 no-op"
300 );
301 Err(DittoError::from_str(
302 ErrorKind::Deprecation,
303 "Live query webhooks are deprecated",
304 ))
305 }
306
307 #[doc(hidden)]
308 #[deprecated(note = "The experimental `timeseries` feature has been deprecated")]
309 #[allow(deprecated)]
310 #[cfg(feature = "timeseries")]
311 /// Returns a [`TimeSeries`] with the provided name.
312 pub fn timeseries(&self, ts_name: &'_ str) -> Result<TimeSeries, DittoError> {
313 Self::validate_collection_name(ts_name)?;
314 let c_name = char_p::new(ts_name);
315 Ok(TimeSeries {
316 ditto: self.ditto.retain(),
317 ts_name: c_name,
318 })
319 }
320
321 /// Return a [`DiskUsage`] to monitor the disk usage of the
322 /// [`Store`].
323 #[doc(hidden)]
324 #[deprecated(note = "Use `ditto.disk_usage()` instead")]
325 pub fn disk_usage(&self) -> &DiskUsage {
326 &self.disk_usage
327 }
328
329 /// Deprecated in favour of [`ditto.store().register_observer_with_signal_next_v2()`][0]
330 ///
331 /// [0]: Store::register_observer_with_signal_next_v2
332 #[deprecated = "Use `ditto.store().register_observer_v2()` instead"]
333 #[allow(deprecated)]
334 #[doc(hidden)]
335 pub fn register_observer<Q, F>(
336 &self,
337 query: Q,
338 query_args: Option<QueryArguments>,
339 on_change: F,
340 ) -> Result<Arc<StoreObserver>, DittoError>
341 where
342 Q: TryInto<Query, Error = DittoError>,
343 F: ChangeHandler,
344 {
345 let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
346
347 let query = query.try_into()?;
348 let query_args = query_args.as_ref().map(|a| a.cbor());
349
350 let observer = Arc::new(StoreObserver::new(
351 &ditto,
352 &query.inner_string,
353 query_args,
354 on_change,
355 )?);
356 Ok(observer)
357 }
358
359 /// Deprecated in favour of [`ditto.store().register_observer_with_signal_next_v2()`][0]
360 ///
361 /// [0]: Store::register_observer_with_signal_next_v2
362 #[deprecated = "Use `ditto.store().register_observer_with_signal_next_v2()` instead"]
363 #[allow(deprecated)]
364 #[doc(hidden)]
365 pub fn register_observer_with_signal_next<Q, F>(
366 &self,
367 query: Q,
368 query_args: Option<QueryArguments>,
369 on_change: F,
370 ) -> Result<Arc<StoreObserver>, DittoError>
371 where
372 Q: TryInto<Query, Error = DittoError>,
373 F: ChangeHandlerWithSignalNext,
374 {
375 let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
376
377 let query = query.try_into()?;
378 let query_args = query_args.as_ref().map(|a| a.cbor());
379
380 let new_obs = Arc::new(StoreObserver::with_signal_next(
381 &ditto,
382 &query.inner_string,
383 query_args,
384 on_change,
385 )?);
386 Ok(new_obs)
387 }
388
389 /// Installs and returns a store observer for a query, configuring Ditto to
390 /// trigger the passed in change handler whenever documents in the local
391 /// store change such that the result of the matching query changes. The
392 /// passed in query must be a `SELECT` query, otherwise an error will be
393 /// returned.
394 ///
395 /// # Example
396 ///
397 /// ```
398 /// use dittolive_ditto::prelude::*;
399 /// # fn example_with_register_observer(ditto: &Ditto) -> anyhow::Result<()> {
400 ///
401 /// let store = ditto.store();
402 /// let _observer = store.register_observer_v2(
403 /// "SELECT * FROM cars WHERE color = 'blue'",
404 /// move |query_result| {
405 /// for item in query_result.iter() {
406 /// // ... handle each item
407 /// }
408 /// },
409 /// )?;
410 /// # Ok(())
411 /// # }
412 /// ```
413 ///
414 /// The first invocation of the change handler will always happen after
415 /// this method has returned.
416 ///
417 /// The observer will remain active until:
418 ///
419 /// - the [`StoreObserver`] handle gets dropped,
420 /// - the [`observer.cancel()`] method is called, or
421 /// - the owning [`Ditto`] instance has shut down
422 ///
423 /// Observer callbacks will never be called concurrently. That is, one callback
424 /// must return before the observer will call the handler again. See
425 /// [`ditto.store().register_observer_with_signal_next(...)`] if you want
426 /// to manually signal readiness for the next callback.
427 ///
428 /// [`ditto.store().register_observer_with_signal_next(...)`]: crate::store::Store::register_observer_with_signal_next
429 /// [`observer.cancel()`]: crate::store::StoreObserver::cancel
430 /// [`Ditto`]: crate::prelude::Ditto
431 pub fn register_observer_v2<Q, F>(
432 &self,
433 query: Q,
434 on_change: F,
435 ) -> Result<Arc<StoreObserver>, DittoError>
436 where
437 Q: IntoQuery,
438 Q::Args: Serialize,
439 F: ChangeHandler,
440 {
441 let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
442 let query = query.into_query()?;
443
444 let observer = Arc::new(StoreObserver::new(
445 &ditto,
446 &query.string,
447 query.args_cbor.as_deref(),
448 on_change,
449 )?);
450 Ok(observer)
451 }
452
453 /// Installs and returns a store observer for a query, configuring Ditto to
454 /// trigger the passed in change handler whenever documents in the local
455 /// store change such that the result of the matching query changes. The
456 /// passed in query must be a `SELECT` query, otherwise an error will be
457 /// returned.
458 ///
459 /// Here, a function is passed as an additional argument to the change
460 /// handler. This allows the change handler to control how frequently
461 /// it is called. See [`register_observer`] for a convenience method that
462 /// automatically signals the next invocation.
463 ///
464 /// # Example
465 ///
466 /// ```
467 /// use dittolive_ditto::prelude::*;
468 /// # fn example_with_signal_next(ditto: &Ditto) -> anyhow::Result<()> {
469 ///
470 /// let store = ditto.store();
471 /// let _observer = store.register_observer_with_signal_next_v2(
472 /// "SELECT * FROM cars WHERE color = 'blue'",
473 /// move |query_result, signal_next| {
474 /// for item in query_result.iter() {
475 /// // ... handle each item
476 /// }
477 ///
478 /// // Call `signal_next` when you're ready for the next callback
479 /// signal_next();
480 /// },
481 /// )?;
482 /// # Ok(())
483 /// # }
484 /// ```
485 ///
486 /// The first invocation of the change handler will always happen after
487 /// this method has returned.
488 ///
489 /// The observer will remain active until:
490 ///
491 /// - the [`StoreObserver`] handle gets dropped,
492 /// - the [`observer.cancel()`] method is called, or
493 /// - the owning [`Ditto`] instance has shut down
494 ///
495 /// After invoking the callback once, the observer will wait to deliver
496 /// another callback until after you've called [`signal_next`].
497 ///
498 /// [`register_observer`]: Store::register_observer
499 /// [`signal_next`]: crate::store::SignalNext
500 pub fn register_observer_with_signal_next_v2<Q, F>(
501 &self,
502 query: Q,
503 on_change: F,
504 ) -> Result<Arc<StoreObserver>, DittoError>
505 where
506 Q: IntoQuery,
507 Q::Args: Serialize,
508 F: ChangeHandlerWithSignalNext,
509 {
510 let ditto = Ditto::upgrade(&self.weak_ditto_fields)?;
511 let query = query.into_query()?;
512
513 let new_obs = Arc::new(StoreObserver::with_signal_next(
514 &ditto,
515 &query.string,
516 query.args_cbor.as_deref(),
517 on_change,
518 )?);
519 Ok(new_obs)
520 }
521
522 /// Gets temporary access to the set of currently registered observers.
523 ///
524 /// A (read) lock is held until the return value is dropped: this means
525 /// that neither [`Self::register_observer()`] nor
526 /// [`StoreObserver::cancel()`] can make progress until this read
527 /// lock is released.
528 pub fn observers(&self) -> impl '_ + Deref<Target = SetArc<StoreObserver>> {
529 let observers: repr_c::Vec<repr_c::Box<FfiStoreObserver>> =
530 ffi_sdk::dittoffi_store_observers(&self.ditto);
531 let observers: Vec<_> = observers.into();
532 let observers = observers
533 .into_iter()
534 .map(|handle: repr_c::Box<FfiStoreObserver>| Arc::new(StoreObserver { handle }))
535 .collect::<SetArc<_>>();
536
537 Box::new(observers)
538 }
539
540 /// Executes the given query in the local store and returns the result.
541 ///
542 /// Use placeholders to incorporate values from the optional `query_args`
543 /// parameter into the query. The keys of the [`QueryArguments`] object must
544 /// match the placeholders used within the query. You can not use placeholders
545 /// in the `FROM` clause.
546 ///
547 /// This method only returns results from the local store without waiting for any
548 /// [`SyncSubscription`]s to have caught up with the
549 /// latest changes. Only use this method if your program must proceed with immediate results.
550 ///
551 /// Use [`ditto.store().register_observer(...)`] to receive updates to query results
552 /// as soon as they have been synced to this peer.
553 ///
554 /// Limitations:
555 ///
556 /// - Supports `SELECT * FROM <collection name>` with optional `WHERE <expression>`
557 /// - No transactions
558 ///
559 /// [`SyncSubscription`]: crate::sync::SyncSubscription
560 /// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
561 #[deprecated = "Use `ditto.store().execute_v2(...)` instead"]
562 #[allow(deprecated)]
563 #[doc(hidden)]
564 pub async fn execute<Q>(
565 &self,
566 query: Q,
567 query_args: Option<QueryArguments>,
568 ) -> Result<QueryResult, DittoError>
569 where
570 Q: TryInto<Query, Error = DittoError>,
571 {
572 // Get a DqlResult. The operation to get it is fallible, so we end up
573 // with a `FfiResult<DqlResult> -> Result<DqlResult> -> DqlResult`.
574 // The `Result` stutter is quite unfortunate, which _results_ from
575 // having picked that name in our public SDK API (over, say, `Output`).
576 let ffi_query_result = ffi_sdk::dittoffi_try_exec_statement(
577 &self.ditto,
578 query.try_into()?.prepare_ffi(),
579 query_args.as_ref().map(|qa| qa.cbor().into()),
580 )
581 .into_rust_result()?;
582
583 Ok(QueryResult::from(ffi_query_result))
584 }
585
586 /// Executes the given query in the local store and returns the result.
587 ///
588 /// # Example
589 ///
590 /// ```
591 /// use dittolive_ditto::prelude::*;
592 /// use dittolive_ditto::dql::QueryResult;
593 /// # async fn example(ditto: &Ditto) -> anyhow::Result<()> {
594 ///
595 /// // Query a collection
596 /// let result = ditto.store().execute_v2("SELECT * FROM cars").await?;
597 ///
598 /// // Insert a document into a collection
599 /// let insert_result: QueryResult = ditto
600 /// .store()
601 /// .execute_v2((
602 /// "INSERT INTO cars DOCUMENTS (:newCar)",
603 /// serde_json::json!({
604 /// "newCar": {
605 /// "make": "ford",
606 /// "color": "blue"
607 /// }
608 /// })
609 /// ))
610 /// .await?;
611 /// # Ok(())
612 /// # }
613 /// ```
614 ///
615 /// Use placeholders to incorporate values from the optional `query_args`
616 /// parameter into the query. The keys of the [`QueryArguments`] object must
617 /// match the placeholders used within the query. You can not use placeholders
618 /// in the `FROM` clause.
619 ///
620 /// This method only returns results from the local store without waiting for any
621 /// [`SyncSubscription`]s to have caught up with the
622 /// latest changes. Only use this method if your program must proceed with immediate results.
623 ///
624 /// Use [`ditto.store().register_observer(...)`] to receive updates to query results
625 /// as soon as they have been synced to this peer.
626 ///
627 /// ## Query parameter
628 ///
629 /// The `query` parameter must implement [`IntoQuery`], which is a trait that is implemented by
630 /// objects that can be turned into a query string along with the relevant query argumnents.
631 ///
632 /// For queries with no arguments, a [`String`] is sufficient
633 ///
634 /// [`SyncSubscription`]: crate::sync::SyncSubscription
635 /// [`ditto.store().register_observer(...)`]: crate::store::Store::register_observer
636 pub async fn execute_v2<Q>(&self, query: Q) -> Result<QueryResult, DittoError>
637 where
638 Q: IntoQuery,
639 Q::Args: serde::Serialize,
640 {
641 let query = query.into_query()?;
642 let query_string = (&*query.string).into();
643 let query_args = query.args_cbor.as_deref().map(Into::into);
644
645 let ffi_query_result =
646 ffi_sdk::dittoffi_try_exec_statement(&self.ditto, query_string, query_args)
647 .into_rust_result()?;
648
649 Ok(QueryResult::from(ffi_query_result))
650 }
651
652 /// Creates a new attachment, which can then be inserted into a document.
653 ///
654 /// The file residing at the provided path will be copied into Ditto’s store. The
655 /// [`DittoAttachment`] object that is returned is what you can
656 /// then use to insert an attachment into a document.
657 ///
658 /// You can provide custom user data about the attachment, which will be replicated to other
659 /// peers alongside the file attachment.
660 pub async fn new_attachment(
661 &self,
662 filepath: &(impl ?Sized + AsRef<Path>),
663 user_data: HashMap<String, String>,
664 ) -> Result<DittoAttachment, DittoError> {
665 DittoAttachment::from_file_and_metadata(filepath, user_data, &self.ditto)
666 }
667
668 /// Creates a new attachment from in-memory data
669 ///
670 /// Refer to [`new_attachment`](Self::new_attachment) for additional information.
671 pub async fn new_attachment_from_bytes(
672 &self,
673 bytes: &(impl ?Sized + AsRef<[u8]>),
674 user_data: HashMap<String, String>,
675 ) -> Result<DittoAttachment, DittoError> {
676 DittoAttachment::from_bytes_and_metadata(bytes, user_data, &self.ditto)
677 }
678
679 /// Fetches the attachment corresponding to the provided attachment token.
680 /// - `attachment_token`: can be either a [`DittoAttachmentToken`], or a `&BTreeMap<CborValue,
681 /// CborValue>`, that is, the output of a [`QueryResultItem::value()`] once casted
682 /// [`.as_object()`][crate::prelude::CborValueGetters::as_object()].
683 ///
684 /// - `on_fetch_event`: A closure that will be called when the status of the request to fetch
685 /// the attachment has changed. If the attachment is already available then this will be
686 /// called almost immediately with a completed status value.
687 ///
688 /// The returned [`DittoAttachmentFetcher`] is a handle which is safe to discard, unless you
689 /// wish to be able to [`.cancel()`][DittoAttachmentFetcher::cancel] the fetching operation.
690 /// When not explicitly cancelled, the fetching operation will remain active until it either
691 /// completes, the attachment is deleted, or the owning [`Ditto`] object is dropped.
692 pub fn fetch_attachment(
693 &self,
694 attachment_token: impl attachment::token::DittoAttachmentTokenLike,
695 on_fetch_event: impl 'static + Send + Sync + Fn(DittoAttachmentFetchEvent),
696 ) -> Result<DittoAttachmentFetcherV2, DittoError> {
697 let attachment_token = attachment_token.parse_attachment_token()?;
698
699 let weak_ditto = self.weak_ditto_fields.clone();
700 let ditto = weak_ditto
701 .upgrade()
702 .ok_or(ErrorKind::ReleasedDittoInstance)?;
703
704 let mut attachment_fetchers_lockguard = self.attachment_fetchers.write().unwrap();
705 let fetcher = DittoAttachmentFetcher::new(
706 attachment_token,
707 Some(&ditto),
708 &self.ditto,
709 // Shim around `on_fetch_event` to `cancel` on completion.
710 move |event, cancel_token: &AtomicU64| {
711 let has_finished = matches! {
712 event,
713 | DittoAttachmentFetchEvent::Completed { .. }
714 | DittoAttachmentFetchEvent::Deleted { .. }
715 };
716 on_fetch_event(event);
717 if has_finished {
718 if let Some(ditto) = weak_ditto.upgrade() {
719 let mut attachment_fetchers_inner_lockguard =
720 ditto.store.attachment_fetchers.write().unwrap();
721 // Relaxed is fine thanks to the lock.
722 let cancel_token = cancel_token.load(atomic::Ordering::Relaxed);
723 ditto.store.unregister_fetcher(
724 cancel_token,
725 Some(&mut *attachment_fetchers_inner_lockguard),
726 );
727 }
728 }
729 },
730 )?;
731 let (cancel_token, was_zero) = fetcher.cancel_token_ensure_unique();
732 attachment_fetchers_lockguard.insert(cancel_token, (was_zero, fetcher.clone()));
733 Ok(fetcher)
734 }
735
736 fn unregister_fetcher(
737 &self,
738 mut fetcher_cancel_token: CancelToken,
739 fetchers: Option<&mut HashMap<CancelToken, (bool, DittoAttachmentFetcherV2)>>,
740 ) -> bool {
741 let mut lock_guard = None;
742 let fetchers = fetchers.unwrap_or_else(|| {
743 &mut **lock_guard.get_or_insert(self.attachment_fetchers.write().unwrap())
744 });
745
746 let Some((was_zero, removed_fetcher)) = fetchers.remove(&fetcher_cancel_token) else {
747 return false;
748 };
749 drop(lock_guard);
750
751 if was_zero {
752 fetcher_cancel_token = 0;
753 }
754
755 let att_token = &removed_fetcher.context.token;
756 debug!(
757 token_id = %att_token.id(),
758 %fetcher_cancel_token,
759 "unregistering ditto attachment fetcher"
760 );
761
762 let status = ffi_sdk::ditto_cancel_resolve_attachment(
763 &self.ditto,
764 att_token.id.as_ref().into(),
765 fetcher_cancel_token,
766 );
767
768 if status != 0 {
769 error!(
770 token_id = %att_token.id(),
771 %fetcher_cancel_token,
772 "failed to clean up attachment fetcher"
773 );
774 }
775 status == 0
776 }
777
778 /// Gets a copy of the set of currently registered attachment fetchers.
779 ///
780 /// A (read) lock is held during the copy: this contends with [`Self::fetch_attachment()`] and
781 /// with [`DittoAttachmentFetcher::cancel()`].
782 pub fn attachment_fetchers(&self) -> Vec<DittoAttachmentFetcherV2> {
783 self.attachment_fetchers
784 .read()
785 .unwrap()
786 .iter()
787 .map(|(_, (_, fetcher))| fetcher.clone())
788 .collect()
789 }
790}
791
792/// Specify the order of returned Documents in a query.
793#[non_exhaustive]
794#[derive(Clone, Copy, PartialEq, Eq, Debug)]
795pub enum SortDirection {
796 /// First result is "smallest", last result is "largest"
797 Ascending,
798
799 /// First result is "largest", last result is "smallest"
800 Descending,
801}
802
803#[non_exhaustive]
804#[derive(Clone, Copy, PartialEq, Eq, Debug)]
805/// Specify the write strategy when inserting documents.
806pub enum WriteStrategy {
807 /// An existing document will be merged with the document being inserted, if there is a
808 /// pre-existing document.
809 Merge,
810
811 /// Insert the document only if there is not already a document with the same Id in the store.
812 /// If there is already a document in the store with the same Id then this will be a no-op.
813 InsertIfAbsent,
814
815 /// Insert the document, with its contents treated as default data, only if there is not
816 /// already a document with the same Id in the store. If there is already a document in the
817 /// store with the same Id then this will be a no-op. Use this strategy if you want to
818 /// insert default data for a given document Id, which you want to treat as common initial
819 /// data amongst all peers and that you expect to be mutated or overwritten in due course.
820 InsertDefaultIfAbsent,
821}
822
823impl WriteStrategy {
824 fn as_write_strategy_rs(&self) -> WriteStrategyRs {
825 match self {
826 WriteStrategy::Merge => WriteStrategyRs::Merge,
827 WriteStrategy::InsertIfAbsent => WriteStrategyRs::InsertIfAbsent,
828 WriteStrategy::InsertDefaultIfAbsent => WriteStrategyRs::InsertDefaultIfAbsent,
829 }
830 }
831}