bonsaidb_core/
transaction.rs

1use arc_bytes::serde::Bytes;
2use serde::{Deserialize, Serialize};
3
4use crate::connection::{AsyncLowLevelConnection, LowLevelConnection};
5use crate::document::{CollectionHeader, DocumentId, HasHeader, Header, Revision};
6use crate::key::KeyEncoding;
7use crate::schema::{Collection, CollectionName, SerializedCollection};
8use crate::Error;
9
10/// A list of operations to execute as a single unit. If any operation fails,
11/// all changes are aborted. Transactions are ACID-compliant. ACID stands for:
12///
13/// - Atomic: All transactions are atomically applied. Readers outside of the
14///   active transaction will never be able to read partially written data. In
15///   BonsaiDb, readers are not blocked while writes are happening -- reads will
16///   continue to read the existing value until the transaction is fully
17///   executed. Once the transaction is fully executed, all future queries will
18///   reflect the updated state immediately.
19///
20/// - Consistent: All transactions will be applied only if the data model is
21///   able to remain fully consistent. This means that all constraints, such as
22///   unique view keys, are validated before a transaction is allowed to be
23///   committed.
24///
25/// - Isolated: Each transaction is executed in an isolated environment.
26///   Currently, BonsaiDb does not offer interactive transactions, so this is
27///   easily guaranteed. When BonsaiDb eventually has interactive transactions,
28///   the transaction will have a fully isolated state until it is committed. No
29///   two transactions can be affected by each other's changes.
30///
31///   In the event of a transaction being aborted or a power outage occurs while
32///   a transaction is being applied, this isolation ensures that once BonsaiDb
33///   opens the database again, the database will reflect the most recently
34///   committed.
35///
36/// - Durable: When the transaction apply function has finished exectuing,
37///   BonsaiDb guarantees that all data has been confirmed by the operating
38///   system as being fully written to disk. This ensures that in the event of a
39///   power outage, no data that has been confirmed will be lost.
40///
41/// When using one of the high-level functions to push/insert/update/delete
42/// documents, behind the scenes single-[`Operation`] `Transaction`s are
43/// applied. To ensure multiple changes happen in the same database operation,
44/// multiple operations can be added to a `Transaction`:
45///
46/// ```rust
47/// # bonsaidb_core::__doctest_prelude!();
48/// # use bonsaidb_core::connection::Connection;
49/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
50/// use bonsaidb_core::transaction::{Operation, Transaction};
51/// let mut tx = Transaction::new();
52/// tx.push(Operation::push_serialized::<MyCollection>(
53///     &MyCollection::default(),
54/// )?);
55/// tx.push(Operation::push_serialized::<MyCollection>(
56///     &MyCollection::default(),
57/// )?);
58/// let results = tx.apply(db)?;
59/// assert_eq!(results.len(), 2);
60/// println!("Two new documents inserted: {results:?}");
61/// # Ok(())
62/// # }
63/// ```
64#[derive(Clone, Serialize, Deserialize, Default, Debug)]
65#[must_use]
66pub struct Transaction {
67    /// The operations in this transaction.
68    pub operations: Vec<Operation>,
69}
70
71impl Transaction {
72    /// Returns a new, empty transaction.
73    pub fn new() -> Self {
74        Self::default()
75    }
76
77    /// Adds an operation to the transaction.
78    pub fn push(&mut self, operation: Operation) {
79        self.operations.push(operation);
80    }
81
82    /// Appends an operation to the transaction and returns self.
83    pub fn with(mut self, operation: Operation) -> Self {
84        self.push(operation);
85        self
86    }
87
88    /// Applies the transaction to the `database`, returning the results of the
89    /// operations. All operations will succeed or none will be performed and an
90    /// error will be returned.
91    pub fn apply<Connection: LowLevelConnection>(
92        self,
93        db: &Connection,
94    ) -> Result<Vec<OperationResult>, Error> {
95        db.apply_transaction(self)
96    }
97
98    /// Applies the transaction to the `database`, returning the results of the
99    /// operations. All operations will succeed or none will be performed and an
100    /// error will be returned.
101    pub async fn apply_async<Connection: AsyncLowLevelConnection>(
102        self,
103        db: &Connection,
104    ) -> Result<Vec<OperationResult>, Error> {
105        db.apply_transaction(self).await
106    }
107}
108
109impl From<Operation> for Transaction {
110    fn from(operation: Operation) -> Self {
111        Self {
112            operations: vec![operation],
113        }
114    }
115}
116
117impl Transaction {
118    /// Inserts a new document with `contents` into `collection`.  If `id` is
119    /// `None` a unique id will be generated. If an id is provided and a
120    /// document already exists with that id, a conflict error will be returned.
121    pub fn insert(
122        collection: CollectionName,
123        id: Option<DocumentId>,
124        contents: impl Into<Bytes>,
125    ) -> Self {
126        Self::from(Operation::insert(collection, id, contents))
127    }
128
129    /// Updates a document in `collection`.
130    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
131        Self::from(Operation::update(collection, header, contents))
132    }
133
134    /// Overwrites a document in `collection`. If a document with `id` exists,
135    /// it will be overwritten. If a document with `id` doesn't exist, it will
136    /// be created.
137    pub fn overwrite(
138        collection: CollectionName,
139        id: DocumentId,
140        contents: impl Into<Bytes>,
141    ) -> Self {
142        Self::from(Operation::overwrite(collection, id, contents))
143    }
144
145    /// Deletes a document from a `collection`.
146    pub fn delete(collection: CollectionName, header: Header) -> Self {
147        Self::from(Operation::delete(collection, header))
148    }
149}
150
151/// A single operation performed on a `Collection`.
152#[derive(Clone, Serialize, Deserialize, Debug)]
153#[must_use]
154pub struct Operation {
155    /// The id of the `Collection`.
156    pub collection: CollectionName,
157
158    /// The command being performed.
159    pub command: Command,
160}
161
162impl Operation {
163    /// Inserts a new document with `contents` into `collection`.  If `id` is
164    /// `None` a unique id will be generated. If an id is provided and a
165    /// document already exists with that id, a conflict error will be returned.
166    pub fn insert(
167        collection: CollectionName,
168        id: Option<DocumentId>,
169        contents: impl Into<Bytes>,
170    ) -> Self {
171        Self {
172            collection,
173            command: Command::Insert {
174                id,
175                contents: contents.into(),
176            },
177        }
178    }
179
180    /// Inserts a new document with the serialized representation of `contents`
181    /// into `collection`. If `id` is `None` a unique id will be generated. If
182    /// an id is provided and a document already exists with that id, a conflict
183    /// error will be returned.
184    pub fn insert_serialized<C: SerializedCollection>(
185        id: Option<&C::PrimaryKey>,
186        contents: &C::Contents,
187    ) -> Result<Self, Error> {
188        let id = id.map(DocumentId::new).transpose()?;
189        let contents = C::serialize(contents)?;
190        Ok(Self::insert(C::collection_name(), id, contents))
191    }
192
193    /// Pushes a new document with the serialized representation of `contents`
194    /// into `collection`.
195    ///
196    /// ## Automatic ID Assignment
197    ///
198    /// This function calls [`SerializedCollection::natural_id()`] to try to
199    /// retrieve a primary key value from `contents`. If an id is returned, the
200    /// item is inserted with that id. If an id is not returned, an id will be
201    /// automatically assigned, if possible, by the storage backend, which uses
202    /// the [`Key`](crate::key::Key) trait to assign ids.
203    pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
204        let id = C::natural_id(contents);
205        let id = id.as_ref().map(DocumentId::new).transpose()?;
206        let contents = C::serialize(contents)?;
207        Ok(Self::insert(C::collection_name(), id, contents))
208    }
209
210    /// Updates a document in `collection`.
211    pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
212        Self {
213            collection,
214            command: Command::Update {
215                header,
216                contents: contents.into(),
217            },
218        }
219    }
220
221    /// Updates a document with the serialized representation of `contents` in
222    /// `collection`.
223    pub fn update_serialized<C: SerializedCollection>(
224        header: CollectionHeader<C::PrimaryKey>,
225        contents: &C::Contents,
226    ) -> Result<Self, Error> {
227        let contents = C::serialize(contents)?;
228        Ok(Self::update(
229            C::collection_name(),
230            Header::try_from(header)?,
231            contents,
232        ))
233    }
234
235    /// Overwrites a document in `collection`. If a document with `id` exists,
236    /// it will be overwritten. If a document with `id` doesn't exist, it will
237    /// be created.
238    pub fn overwrite(
239        collection: CollectionName,
240        id: DocumentId,
241        contents: impl Into<Bytes>,
242    ) -> Self {
243        Self {
244            collection,
245            command: Command::Overwrite {
246                id,
247                contents: contents.into(),
248            },
249        }
250    }
251
252    /// Overwrites a document with the serialized representation of `contents`
253    /// in `collection`. If a document with `id` exists, it will be overwritten.
254    /// If a document with `id` doesn't exist, it will be created.
255    pub fn overwrite_serialized<C: SerializedCollection, Key>(
256        id: &Key,
257        contents: &C::Contents,
258    ) -> Result<Self, Error>
259    where
260        Key: KeyEncoding<C::PrimaryKey> + ?Sized,
261    {
262        let contents = C::serialize(contents)?;
263        Ok(Self::overwrite(
264            C::collection_name(),
265            DocumentId::new(id)?,
266            contents,
267        ))
268    }
269
270    /// Deletes a document from a `collection`.
271    pub const fn delete(collection: CollectionName, header: Header) -> Self {
272        Self {
273            collection,
274            command: Command::Delete { header },
275        }
276    }
277
278    /// Check that the document `id` still exists in `collection`. If a document
279    /// with that id is not present, the transaction will not be applied and
280    /// [`Error::DocumentNotFound`] will be returned.
281    ///
282    /// Upon success, [`OperationResult::Success`] will be included in the
283    /// transaction's results.
284    pub const fn check_document_id_exists(collection: CollectionName, id: DocumentId) -> Self {
285        Self {
286            collection,
287            command: Command::Check { id, revision: None },
288        }
289    }
290
291    /// Check that the document `id` still exists in [`Collection`] `C`. If a
292    /// document with that id is not present, the transaction will not be
293    /// applied and [`Error::DocumentNotFound`] will be returned.
294    ///
295    /// Upon success, [`OperationResult::Success`] will be included in the
296    /// transaction's results.
297    pub fn check_document_exists<C: Collection>(id: &C::PrimaryKey) -> Result<Self, Error> {
298        Ok(Self::check_document_id_exists(
299            C::collection_name(),
300            DocumentId::new(id)?,
301        ))
302    }
303
304    /// Check that the header of `doc_or_header` is the current revision of the
305    /// stored document in [`Collection`] `C`. If a document with the header's
306    /// id is not present, the transaction will not be applied and
307    /// [`Error::DocumentNotFound`] will be returned. If a document with the
308    /// header's id is present and the revision does not match, the transaction
309    /// will not be applied and [`Error::DocumentConflict`] will be returned.
310    ///
311    /// Upon success, [`OperationResult::Success`] will be included in the
312    /// transaction's results.
313    pub fn check_document_is_current<C: Collection, H: HasHeader>(
314        doc_or_header: &H,
315    ) -> Result<Self, Error> {
316        let header = doc_or_header.header()?;
317        Ok(Self {
318            collection: C::collection_name(),
319            command: Command::Check {
320                id: header.id,
321                revision: Some(header.revision),
322            },
323        })
324    }
325}
326
327/// A command to execute within a `Collection`.
328#[derive(Clone, Serialize, Deserialize, Debug)]
329pub enum Command {
330    /// Inserts a new document containing `contents`.
331    Insert {
332        /// An optional id for the document. If this is `None`, a unique id will
333        /// be generated. If this is `Some()` and a document already exists with
334        /// that id, a conflict error will be returned.
335        id: Option<DocumentId>,
336        /// The initial contents of the document.
337        contents: Bytes,
338    },
339
340    /// Update an existing `Document` identified by `header`. `header.revision` must match
341    /// the currently stored revision on the `Document`. If it does not, the
342    /// command fill fail with a `DocumentConflict` error.
343    Update {
344        /// The header of the `Document`. The revision must match the current
345        /// document.
346        header: Header,
347
348        /// The new contents to store within the `Document`.
349        contents: Bytes,
350    },
351
352    /// Overwrite an existing `Document` identified by `id`. The revision will
353    /// not be checked before the document is updated. If the document does not
354    /// exist, it will be created.
355    Overwrite {
356        /// The id of the document to overwrite.
357        id: DocumentId,
358
359        /// The new contents to store within the `Document`.
360        contents: Bytes,
361    },
362
363    /// Delete an existing `Document` identified by `id`. `revision` must match
364    /// the currently stored revision on the `Document`. If it does not, the
365    /// command fill fail with a `DocumentConflict` error.
366    Delete {
367        /// The current header of the `Document`.
368        header: Header,
369    },
370
371    /// Checks whether a document exists, and optionally whether its revision is
372    /// still current. If the document is not found, a `DocumentNotFound` error
373    /// will be returned.  If the document revision is provided and does not
374    /// match, a `DocumentConflict` error will be returned.
375    Check {
376        /// The id of the document to check.
377        id: DocumentId,
378        /// The revision of the document to check.
379        revision: Option<Revision>,
380    },
381}
382
383/// Information about the result of each `Operation` in a transaction.
384#[derive(Clone, Debug, Serialize, Deserialize)]
385pub enum OperationResult {
386    /// An operation succeeded but had no information to output.
387    Success,
388
389    /// A `Document` was updated.
390    DocumentUpdated {
391        /// The id of the `Collection` of the updated `Document`.
392        collection: CollectionName,
393
394        /// The header of the updated `Document`.
395        header: Header,
396    },
397
398    /// A `Document` was deleted.
399    DocumentDeleted {
400        /// The id of the `Collection` of the deleted `Document`.
401        collection: CollectionName,
402
403        /// The id of the deleted `Document`.
404        id: DocumentId,
405    },
406}
407
408/// Details about an executed transaction.
409#[derive(Clone, Debug, Serialize, Deserialize)]
410pub struct Executed {
411    /// The id of the transaction.
412    pub id: u64,
413
414    /// A list of containing ids of `Documents` changed.
415    pub changes: Changes,
416}
417
418/// A list of changes.
419#[derive(Clone, Debug, Serialize, Deserialize)]
420pub enum Changes {
421    /// A list of changed documents.
422    Documents(DocumentChanges),
423    /// A list of changed keys.
424    Keys(Vec<ChangedKey>),
425}
426
427impl Changes {
428    /// Returns the list of documents changed in this transaction, or None if
429    /// the transaction was not a document transaction.
430    #[must_use]
431    pub const fn documents(&self) -> Option<&DocumentChanges> {
432        if let Self::Documents(changes) = self {
433            Some(changes)
434        } else {
435            None
436        }
437    }
438
439    /// Returns the list of keys changed in this transaction, or None if the
440    /// transaction was not a `KeyValue` transaction.
441    #[must_use]
442    pub fn keys(&self) -> Option<&[ChangedKey]> {
443        if let Self::Keys(keys) = self {
444            Some(keys)
445        } else {
446            None
447        }
448    }
449}
450
451/// A list of changed documents.
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct DocumentChanges {
454    /// All of the collections changed.
455    pub collections: Vec<CollectionName>,
456    /// The individual document changes.
457    pub documents: Vec<ChangedDocument>,
458}
459
460impl DocumentChanges {
461    /// Returns the changed document and the name of the collection the change
462    /// happened to.
463    #[must_use]
464    pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
465        self.documents.get(index).and_then(|doc| {
466            self.collections
467                .get(usize::from(doc.collection))
468                .map(|collection| (collection, doc))
469        })
470    }
471
472    /// Returns the number of changes in this collection.
473    #[must_use]
474    pub fn len(&self) -> usize {
475        self.documents.len()
476    }
477
478    /// Returns true if there are no changes.
479    #[must_use]
480    pub fn is_empty(&self) -> bool {
481        self.documents.is_empty()
482    }
483
484    /// Returns an interator over all of the changed documents.
485    pub const fn iter(&self) -> DocumentChangesIter<'_> {
486        DocumentChangesIter {
487            changes: self,
488            index: Some(0),
489        }
490    }
491}
492
493/// An iterator over [`DocumentChanges`].
494#[must_use]
495pub struct DocumentChangesIter<'a> {
496    changes: &'a DocumentChanges,
497    index: Option<usize>,
498}
499
500impl<'a> Iterator for DocumentChangesIter<'a> {
501    type Item = (&'a CollectionName, &'a ChangedDocument);
502
503    fn next(&mut self) -> Option<Self::Item> {
504        self.index.and_then(|index| {
505            let result = self.changes.get(index);
506            if result.is_some() {
507                self.index = index.checked_add(1);
508            }
509            result
510        })
511    }
512}
513
514/// A draining iterator over [`ChangedDocument`]s.
515#[must_use]
516pub struct DocumentChangesIntoIter {
517    collections: Vec<CollectionName>,
518    documents: std::vec::IntoIter<ChangedDocument>,
519}
520
521impl Iterator for DocumentChangesIntoIter {
522    type Item = (CollectionName, ChangedDocument);
523
524    fn next(&mut self) -> Option<Self::Item> {
525        self.documents.next().and_then(|doc| {
526            self.collections
527                .get(usize::from(doc.collection))
528                .map(|collection| (collection.clone(), doc))
529        })
530    }
531}
532
533impl IntoIterator for DocumentChanges {
534    type IntoIter = DocumentChangesIntoIter;
535    type Item = (CollectionName, ChangedDocument);
536
537    fn into_iter(self) -> Self::IntoIter {
538        DocumentChangesIntoIter {
539            collections: self.collections,
540            documents: self.documents.into_iter(),
541        }
542    }
543}
544
545#[test]
546fn document_changes_iter() {
547    use crate::schema::Qualified;
548
549    let changes = DocumentChanges {
550        collections: vec![CollectionName::private("a"), CollectionName::private("b")],
551        documents: vec![
552            ChangedDocument {
553                collection: 0,
554                id: DocumentId::from_u64(0),
555                deleted: false,
556            },
557            ChangedDocument {
558                collection: 0,
559                id: DocumentId::from_u64(1),
560                deleted: false,
561            },
562            ChangedDocument {
563                collection: 1,
564                id: DocumentId::from_u64(2),
565                deleted: false,
566            },
567            ChangedDocument {
568                collection: 2,
569                id: DocumentId::from_u64(3),
570                deleted: false,
571            },
572        ],
573    };
574
575    assert_eq!(changes.len(), 4);
576    assert!(!changes.is_empty());
577
578    let mut a_changes = 0;
579    let mut b_changes = 0;
580    let mut ids = Vec::new();
581    for (collection, document) in changes.iter() {
582        assert!(!ids.contains(&document.id));
583        ids.push(document.id.clone());
584        match collection.name.as_ref() {
585            "a" => a_changes += 1,
586            "b" => b_changes += 1,
587            _ => unreachable!("invalid collection name {collection}"),
588        }
589    }
590    assert_eq!(a_changes, 2);
591    assert_eq!(b_changes, 1);
592
593    let mut a_changes = 0;
594    let mut b_changes = 0;
595    let mut ids = Vec::new();
596    for (collection, document) in changes {
597        assert!(!ids.contains(&document.id));
598        ids.push(document.id.clone());
599        match collection.name.as_ref() {
600            "a" => a_changes += 1,
601            "b" => b_changes += 1,
602            _ => unreachable!("invalid collection name {collection}"),
603        }
604    }
605    assert_eq!(a_changes, 2);
606    assert_eq!(b_changes, 1);
607}
608
609/// A record of a changed document.
610#[derive(Debug, Clone, Serialize, Deserialize)]
611pub struct ChangedDocument {
612    /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
613    pub collection: u16,
614
615    /// The id of the changed `Document`.
616    pub id: DocumentId,
617
618    /// If the `Document` has been deleted, this will be `true`.
619    pub deleted: bool,
620}
621
622/// A record of a changed `KeyValue` entry.
623#[derive(Clone, Debug, Serialize, Deserialize)]
624pub struct ChangedKey {
625    /// The namespace of the key.
626    pub namespace: Option<String>,
627
628    /// The key that was changed.
629    pub key: String,
630
631    /// True if the key was deleted.
632    pub deleted: bool,
633}