bonsaidb_core/transaction.rs
1use arc_bytes::serde::Bytes;
2use serde::{Deserialize, Serialize};
3
4use crate::connection::{AsyncLowLevelConnection, LowLevelConnection};
5use crate::document::{CollectionHeader, DocumentId, HasHeader, Header, Revision};
6use crate::key::KeyEncoding;
7use crate::schema::{Collection, CollectionName, SerializedCollection};
8use crate::Error;
9
10/// A list of operations to execute as a single unit. If any operation fails,
11/// all changes are aborted. Transactions are ACID-compliant. ACID stands for:
12///
13/// - Atomic: All transactions are atomically applied. Readers outside of the
14/// active transaction will never be able to read partially written data. In
15/// BonsaiDb, readers are not blocked while writes are happening -- reads will
16/// continue to read the existing value until the transaction is fully
17/// executed. Once the transaction is fully executed, all future queries will
18/// reflect the updated state immediately.
19///
20/// - Consistent: All transactions will be applied only if the data model is
21/// able to remain fully consistent. This means that all constraints, such as
22/// unique view keys, are validated before a transaction is allowed to be
23/// committed.
24///
25/// - Isolated: Each transaction is executed in an isolated environment.
26/// Currently, BonsaiDb does not offer interactive transactions, so this is
27/// easily guaranteed. When BonsaiDb eventually has interactive transactions,
28/// the transaction will have a fully isolated state until it is committed. No
29/// two transactions can be affected by each other's changes.
30///
31/// In the event of a transaction being aborted or a power outage occurs while
32/// a transaction is being applied, this isolation ensures that once BonsaiDb
33/// opens the database again, the database will reflect the most recently
34/// committed.
35///
36/// - Durable: When the transaction apply function has finished exectuing,
37/// BonsaiDb guarantees that all data has been confirmed by the operating
38/// system as being fully written to disk. This ensures that in the event of a
39/// power outage, no data that has been confirmed will be lost.
40///
41/// When using one of the high-level functions to push/insert/update/delete
42/// documents, behind the scenes single-[`Operation`] `Transaction`s are
43/// applied. To ensure multiple changes happen in the same database operation,
44/// multiple operations can be added to a `Transaction`:
45///
46/// ```rust
47/// # bonsaidb_core::__doctest_prelude!();
48/// # use bonsaidb_core::connection::Connection;
49/// # fn test_fn<C: Connection>(db: &C) -> Result<(), Error> {
50/// use bonsaidb_core::transaction::{Operation, Transaction};
51/// let mut tx = Transaction::new();
52/// tx.push(Operation::push_serialized::<MyCollection>(
53/// &MyCollection::default(),
54/// )?);
55/// tx.push(Operation::push_serialized::<MyCollection>(
56/// &MyCollection::default(),
57/// )?);
58/// let results = tx.apply(db)?;
59/// assert_eq!(results.len(), 2);
60/// println!("Two new documents inserted: {results:?}");
61/// # Ok(())
62/// # }
63/// ```
64#[derive(Clone, Serialize, Deserialize, Default, Debug)]
65#[must_use]
66pub struct Transaction {
67 /// The operations in this transaction.
68 pub operations: Vec<Operation>,
69}
70
71impl Transaction {
72 /// Returns a new, empty transaction.
73 pub fn new() -> Self {
74 Self::default()
75 }
76
77 /// Adds an operation to the transaction.
78 pub fn push(&mut self, operation: Operation) {
79 self.operations.push(operation);
80 }
81
82 /// Appends an operation to the transaction and returns self.
83 pub fn with(mut self, operation: Operation) -> Self {
84 self.push(operation);
85 self
86 }
87
88 /// Applies the transaction to the `database`, returning the results of the
89 /// operations. All operations will succeed or none will be performed and an
90 /// error will be returned.
91 pub fn apply<Connection: LowLevelConnection>(
92 self,
93 db: &Connection,
94 ) -> Result<Vec<OperationResult>, Error> {
95 db.apply_transaction(self)
96 }
97
98 /// Applies the transaction to the `database`, returning the results of the
99 /// operations. All operations will succeed or none will be performed and an
100 /// error will be returned.
101 pub async fn apply_async<Connection: AsyncLowLevelConnection>(
102 self,
103 db: &Connection,
104 ) -> Result<Vec<OperationResult>, Error> {
105 db.apply_transaction(self).await
106 }
107}
108
109impl From<Operation> for Transaction {
110 fn from(operation: Operation) -> Self {
111 Self {
112 operations: vec![operation],
113 }
114 }
115}
116
117impl Transaction {
118 /// Inserts a new document with `contents` into `collection`. If `id` is
119 /// `None` a unique id will be generated. If an id is provided and a
120 /// document already exists with that id, a conflict error will be returned.
121 pub fn insert(
122 collection: CollectionName,
123 id: Option<DocumentId>,
124 contents: impl Into<Bytes>,
125 ) -> Self {
126 Self::from(Operation::insert(collection, id, contents))
127 }
128
129 /// Updates a document in `collection`.
130 pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
131 Self::from(Operation::update(collection, header, contents))
132 }
133
134 /// Overwrites a document in `collection`. If a document with `id` exists,
135 /// it will be overwritten. If a document with `id` doesn't exist, it will
136 /// be created.
137 pub fn overwrite(
138 collection: CollectionName,
139 id: DocumentId,
140 contents: impl Into<Bytes>,
141 ) -> Self {
142 Self::from(Operation::overwrite(collection, id, contents))
143 }
144
145 /// Deletes a document from a `collection`.
146 pub fn delete(collection: CollectionName, header: Header) -> Self {
147 Self::from(Operation::delete(collection, header))
148 }
149}
150
151/// A single operation performed on a `Collection`.
152#[derive(Clone, Serialize, Deserialize, Debug)]
153#[must_use]
154pub struct Operation {
155 /// The id of the `Collection`.
156 pub collection: CollectionName,
157
158 /// The command being performed.
159 pub command: Command,
160}
161
162impl Operation {
163 /// Inserts a new document with `contents` into `collection`. If `id` is
164 /// `None` a unique id will be generated. If an id is provided and a
165 /// document already exists with that id, a conflict error will be returned.
166 pub fn insert(
167 collection: CollectionName,
168 id: Option<DocumentId>,
169 contents: impl Into<Bytes>,
170 ) -> Self {
171 Self {
172 collection,
173 command: Command::Insert {
174 id,
175 contents: contents.into(),
176 },
177 }
178 }
179
180 /// Inserts a new document with the serialized representation of `contents`
181 /// into `collection`. If `id` is `None` a unique id will be generated. If
182 /// an id is provided and a document already exists with that id, a conflict
183 /// error will be returned.
184 pub fn insert_serialized<C: SerializedCollection>(
185 id: Option<&C::PrimaryKey>,
186 contents: &C::Contents,
187 ) -> Result<Self, Error> {
188 let id = id.map(DocumentId::new).transpose()?;
189 let contents = C::serialize(contents)?;
190 Ok(Self::insert(C::collection_name(), id, contents))
191 }
192
193 /// Pushes a new document with the serialized representation of `contents`
194 /// into `collection`.
195 ///
196 /// ## Automatic ID Assignment
197 ///
198 /// This function calls [`SerializedCollection::natural_id()`] to try to
199 /// retrieve a primary key value from `contents`. If an id is returned, the
200 /// item is inserted with that id. If an id is not returned, an id will be
201 /// automatically assigned, if possible, by the storage backend, which uses
202 /// the [`Key`](crate::key::Key) trait to assign ids.
203 pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
204 let id = C::natural_id(contents);
205 let id = id.as_ref().map(DocumentId::new).transpose()?;
206 let contents = C::serialize(contents)?;
207 Ok(Self::insert(C::collection_name(), id, contents))
208 }
209
210 /// Updates a document in `collection`.
211 pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
212 Self {
213 collection,
214 command: Command::Update {
215 header,
216 contents: contents.into(),
217 },
218 }
219 }
220
221 /// Updates a document with the serialized representation of `contents` in
222 /// `collection`.
223 pub fn update_serialized<C: SerializedCollection>(
224 header: CollectionHeader<C::PrimaryKey>,
225 contents: &C::Contents,
226 ) -> Result<Self, Error> {
227 let contents = C::serialize(contents)?;
228 Ok(Self::update(
229 C::collection_name(),
230 Header::try_from(header)?,
231 contents,
232 ))
233 }
234
235 /// Overwrites a document in `collection`. If a document with `id` exists,
236 /// it will be overwritten. If a document with `id` doesn't exist, it will
237 /// be created.
238 pub fn overwrite(
239 collection: CollectionName,
240 id: DocumentId,
241 contents: impl Into<Bytes>,
242 ) -> Self {
243 Self {
244 collection,
245 command: Command::Overwrite {
246 id,
247 contents: contents.into(),
248 },
249 }
250 }
251
252 /// Overwrites a document with the serialized representation of `contents`
253 /// in `collection`. If a document with `id` exists, it will be overwritten.
254 /// If a document with `id` doesn't exist, it will be created.
255 pub fn overwrite_serialized<C: SerializedCollection, Key>(
256 id: &Key,
257 contents: &C::Contents,
258 ) -> Result<Self, Error>
259 where
260 Key: KeyEncoding<C::PrimaryKey> + ?Sized,
261 {
262 let contents = C::serialize(contents)?;
263 Ok(Self::overwrite(
264 C::collection_name(),
265 DocumentId::new(id)?,
266 contents,
267 ))
268 }
269
270 /// Deletes a document from a `collection`.
271 pub const fn delete(collection: CollectionName, header: Header) -> Self {
272 Self {
273 collection,
274 command: Command::Delete { header },
275 }
276 }
277
278 /// Check that the document `id` still exists in `collection`. If a document
279 /// with that id is not present, the transaction will not be applied and
280 /// [`Error::DocumentNotFound`] will be returned.
281 ///
282 /// Upon success, [`OperationResult::Success`] will be included in the
283 /// transaction's results.
284 pub const fn check_document_id_exists(collection: CollectionName, id: DocumentId) -> Self {
285 Self {
286 collection,
287 command: Command::Check { id, revision: None },
288 }
289 }
290
291 /// Check that the document `id` still exists in [`Collection`] `C`. If a
292 /// document with that id is not present, the transaction will not be
293 /// applied and [`Error::DocumentNotFound`] will be returned.
294 ///
295 /// Upon success, [`OperationResult::Success`] will be included in the
296 /// transaction's results.
297 pub fn check_document_exists<C: Collection>(id: &C::PrimaryKey) -> Result<Self, Error> {
298 Ok(Self::check_document_id_exists(
299 C::collection_name(),
300 DocumentId::new(id)?,
301 ))
302 }
303
304 /// Check that the header of `doc_or_header` is the current revision of the
305 /// stored document in [`Collection`] `C`. If a document with the header's
306 /// id is not present, the transaction will not be applied and
307 /// [`Error::DocumentNotFound`] will be returned. If a document with the
308 /// header's id is present and the revision does not match, the transaction
309 /// will not be applied and [`Error::DocumentConflict`] will be returned.
310 ///
311 /// Upon success, [`OperationResult::Success`] will be included in the
312 /// transaction's results.
313 pub fn check_document_is_current<C: Collection, H: HasHeader>(
314 doc_or_header: &H,
315 ) -> Result<Self, Error> {
316 let header = doc_or_header.header()?;
317 Ok(Self {
318 collection: C::collection_name(),
319 command: Command::Check {
320 id: header.id,
321 revision: Some(header.revision),
322 },
323 })
324 }
325}
326
327/// A command to execute within a `Collection`.
328#[derive(Clone, Serialize, Deserialize, Debug)]
329pub enum Command {
330 /// Inserts a new document containing `contents`.
331 Insert {
332 /// An optional id for the document. If this is `None`, a unique id will
333 /// be generated. If this is `Some()` and a document already exists with
334 /// that id, a conflict error will be returned.
335 id: Option<DocumentId>,
336 /// The initial contents of the document.
337 contents: Bytes,
338 },
339
340 /// Update an existing `Document` identified by `header`. `header.revision` must match
341 /// the currently stored revision on the `Document`. If it does not, the
342 /// command fill fail with a `DocumentConflict` error.
343 Update {
344 /// The header of the `Document`. The revision must match the current
345 /// document.
346 header: Header,
347
348 /// The new contents to store within the `Document`.
349 contents: Bytes,
350 },
351
352 /// Overwrite an existing `Document` identified by `id`. The revision will
353 /// not be checked before the document is updated. If the document does not
354 /// exist, it will be created.
355 Overwrite {
356 /// The id of the document to overwrite.
357 id: DocumentId,
358
359 /// The new contents to store within the `Document`.
360 contents: Bytes,
361 },
362
363 /// Delete an existing `Document` identified by `id`. `revision` must match
364 /// the currently stored revision on the `Document`. If it does not, the
365 /// command fill fail with a `DocumentConflict` error.
366 Delete {
367 /// The current header of the `Document`.
368 header: Header,
369 },
370
371 /// Checks whether a document exists, and optionally whether its revision is
372 /// still current. If the document is not found, a `DocumentNotFound` error
373 /// will be returned. If the document revision is provided and does not
374 /// match, a `DocumentConflict` error will be returned.
375 Check {
376 /// The id of the document to check.
377 id: DocumentId,
378 /// The revision of the document to check.
379 revision: Option<Revision>,
380 },
381}
382
383/// Information about the result of each `Operation` in a transaction.
384#[derive(Clone, Debug, Serialize, Deserialize)]
385pub enum OperationResult {
386 /// An operation succeeded but had no information to output.
387 Success,
388
389 /// A `Document` was updated.
390 DocumentUpdated {
391 /// The id of the `Collection` of the updated `Document`.
392 collection: CollectionName,
393
394 /// The header of the updated `Document`.
395 header: Header,
396 },
397
398 /// A `Document` was deleted.
399 DocumentDeleted {
400 /// The id of the `Collection` of the deleted `Document`.
401 collection: CollectionName,
402
403 /// The id of the deleted `Document`.
404 id: DocumentId,
405 },
406}
407
408/// Details about an executed transaction.
409#[derive(Clone, Debug, Serialize, Deserialize)]
410pub struct Executed {
411 /// The id of the transaction.
412 pub id: u64,
413
414 /// A list of containing ids of `Documents` changed.
415 pub changes: Changes,
416}
417
418/// A list of changes.
419#[derive(Clone, Debug, Serialize, Deserialize)]
420pub enum Changes {
421 /// A list of changed documents.
422 Documents(DocumentChanges),
423 /// A list of changed keys.
424 Keys(Vec<ChangedKey>),
425}
426
427impl Changes {
428 /// Returns the list of documents changed in this transaction, or None if
429 /// the transaction was not a document transaction.
430 #[must_use]
431 pub const fn documents(&self) -> Option<&DocumentChanges> {
432 if let Self::Documents(changes) = self {
433 Some(changes)
434 } else {
435 None
436 }
437 }
438
439 /// Returns the list of keys changed in this transaction, or None if the
440 /// transaction was not a `KeyValue` transaction.
441 #[must_use]
442 pub fn keys(&self) -> Option<&[ChangedKey]> {
443 if let Self::Keys(keys) = self {
444 Some(keys)
445 } else {
446 None
447 }
448 }
449}
450
451/// A list of changed documents.
452#[derive(Clone, Debug, Serialize, Deserialize)]
453pub struct DocumentChanges {
454 /// All of the collections changed.
455 pub collections: Vec<CollectionName>,
456 /// The individual document changes.
457 pub documents: Vec<ChangedDocument>,
458}
459
460impl DocumentChanges {
461 /// Returns the changed document and the name of the collection the change
462 /// happened to.
463 #[must_use]
464 pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
465 self.documents.get(index).and_then(|doc| {
466 self.collections
467 .get(usize::from(doc.collection))
468 .map(|collection| (collection, doc))
469 })
470 }
471
472 /// Returns the number of changes in this collection.
473 #[must_use]
474 pub fn len(&self) -> usize {
475 self.documents.len()
476 }
477
478 /// Returns true if there are no changes.
479 #[must_use]
480 pub fn is_empty(&self) -> bool {
481 self.documents.is_empty()
482 }
483
484 /// Returns an interator over all of the changed documents.
485 pub const fn iter(&self) -> DocumentChangesIter<'_> {
486 DocumentChangesIter {
487 changes: self,
488 index: Some(0),
489 }
490 }
491}
492
493/// An iterator over [`DocumentChanges`].
494#[must_use]
495pub struct DocumentChangesIter<'a> {
496 changes: &'a DocumentChanges,
497 index: Option<usize>,
498}
499
500impl<'a> Iterator for DocumentChangesIter<'a> {
501 type Item = (&'a CollectionName, &'a ChangedDocument);
502
503 fn next(&mut self) -> Option<Self::Item> {
504 self.index.and_then(|index| {
505 let result = self.changes.get(index);
506 if result.is_some() {
507 self.index = index.checked_add(1);
508 }
509 result
510 })
511 }
512}
513
514/// A draining iterator over [`ChangedDocument`]s.
515#[must_use]
516pub struct DocumentChangesIntoIter {
517 collections: Vec<CollectionName>,
518 documents: std::vec::IntoIter<ChangedDocument>,
519}
520
521impl Iterator for DocumentChangesIntoIter {
522 type Item = (CollectionName, ChangedDocument);
523
524 fn next(&mut self) -> Option<Self::Item> {
525 self.documents.next().and_then(|doc| {
526 self.collections
527 .get(usize::from(doc.collection))
528 .map(|collection| (collection.clone(), doc))
529 })
530 }
531}
532
533impl IntoIterator for DocumentChanges {
534 type IntoIter = DocumentChangesIntoIter;
535 type Item = (CollectionName, ChangedDocument);
536
537 fn into_iter(self) -> Self::IntoIter {
538 DocumentChangesIntoIter {
539 collections: self.collections,
540 documents: self.documents.into_iter(),
541 }
542 }
543}
544
545#[test]
546fn document_changes_iter() {
547 use crate::schema::Qualified;
548
549 let changes = DocumentChanges {
550 collections: vec![CollectionName::private("a"), CollectionName::private("b")],
551 documents: vec![
552 ChangedDocument {
553 collection: 0,
554 id: DocumentId::from_u64(0),
555 deleted: false,
556 },
557 ChangedDocument {
558 collection: 0,
559 id: DocumentId::from_u64(1),
560 deleted: false,
561 },
562 ChangedDocument {
563 collection: 1,
564 id: DocumentId::from_u64(2),
565 deleted: false,
566 },
567 ChangedDocument {
568 collection: 2,
569 id: DocumentId::from_u64(3),
570 deleted: false,
571 },
572 ],
573 };
574
575 assert_eq!(changes.len(), 4);
576 assert!(!changes.is_empty());
577
578 let mut a_changes = 0;
579 let mut b_changes = 0;
580 let mut ids = Vec::new();
581 for (collection, document) in changes.iter() {
582 assert!(!ids.contains(&document.id));
583 ids.push(document.id.clone());
584 match collection.name.as_ref() {
585 "a" => a_changes += 1,
586 "b" => b_changes += 1,
587 _ => unreachable!("invalid collection name {collection}"),
588 }
589 }
590 assert_eq!(a_changes, 2);
591 assert_eq!(b_changes, 1);
592
593 let mut a_changes = 0;
594 let mut b_changes = 0;
595 let mut ids = Vec::new();
596 for (collection, document) in changes {
597 assert!(!ids.contains(&document.id));
598 ids.push(document.id.clone());
599 match collection.name.as_ref() {
600 "a" => a_changes += 1,
601 "b" => b_changes += 1,
602 _ => unreachable!("invalid collection name {collection}"),
603 }
604 }
605 assert_eq!(a_changes, 2);
606 assert_eq!(b_changes, 1);
607}
608
609/// A record of a changed document.
610#[derive(Debug, Clone, Serialize, Deserialize)]
611pub struct ChangedDocument {
612 /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
613 pub collection: u16,
614
615 /// The id of the changed `Document`.
616 pub id: DocumentId,
617
618 /// If the `Document` has been deleted, this will be `true`.
619 pub deleted: bool,
620}
621
622/// A record of a changed `KeyValue` entry.
623#[derive(Clone, Debug, Serialize, Deserialize)]
624pub struct ChangedKey {
625 /// The namespace of the key.
626 pub namespace: Option<String>,
627
628 /// The key that was changed.
629 pub key: String,
630
631 /// True if the key was deleted.
632 pub deleted: bool,
633}