fog_db_traits/
transaction.rs

1//! The database trasaction interface.
2//!
3//! Transactions can be executed on the local database, both for local
4//! communication and as a way of talking to remote nodes. Transactions follow
5//! ACID properties, and can consist of adding documents, and either adding,
6//! modifying, or removing entries. Managing schemas and database root names is
7//! outside the scope of the transaction interface.
8use std::{
9    collections::{HashMap, HashSet},
10    sync::Arc,
11};
12
13use fog_pack::{
14    document::{Document, NewDocument},
15    entry::{Entry, EntryRef, NewEntry},
16    error::Error as FogError,
17    schema::{NoSchema, Schema},
18    types::*,
19};
20use thiserror::Error;
21
22use crate::{DbCommit, DbResult, cert::Policy, };
23
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum CommitError {
26    /// Tried to change or delete an entry but it wasn't in the DB
27    MissingEntry(EntryRef),
28    /// Tried to add an entry but its parent document wasn't in the DB
29    MissingParent(EntryRef),
30    /// Tried to change a document's references but it wasn't in the DB
31    MissingDoc(Hash),
32    /// Tried to change a document's references but the ref wasn't in the document
33    MissingDocRef { doc: Hash, target: Hash },
34    /// Tried to add a document but the schema was missing
35    MissingSchema { doc: Hash, schema: Hash },
36}
37
38pub struct CommitErrors {
39    pub docs: HashMap<Hash, DocChange>,
40    pub entries: HashMap<EntryRef, EntryChange>,
41    pub errors: Vec<CommitError>,
42}
43
44/// A pending transaction to execute on a database.
45pub struct Transaction {
46    db: Box<dyn DbCommit>,
47    docs: HashMap<Hash, DocChange>,
48    entries: HashMap<EntryRef, EntryChange>,
49}
50
51/// Failure while trying to find and complete a schema
52#[derive(Clone, Debug, Error)]
53pub enum SchemaError {
54    #[error("Missing schema {0}")]
55    MissingSchema(Hash),
56    #[error("Validation failed")]
57    ValidationFail(#[from] FogError),
58}
59
60/// Failure while trying to find a schema for a document
61#[derive(Clone, Debug)]
62pub struct MissingSchema(pub Hash);
63
64impl std::fmt::Display for MissingSchema {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.write_str("Missing schema {0}")
67    }
68}
69
70impl std::error::Error for MissingSchema {}
71
72/// Failure while processing an entry
73#[derive(Clone, Debug, Error)]
74pub enum EntryError {
75    #[error("Entry needed missing schema {0}")]
76    MissingEntrySchema(Hash),
77    #[error("Entry Validation failed")]
78    EntryValidationFail(#[from] FogError),
79    #[error("Document validation failed within context of entry (doc = {doc})")]
80    DocValidationFail { doc: Hash, source: FogError },
81    #[error("Missing document {0}")]
82    MissingDoc(Hash),
83}
84
85impl Transaction {
86    pub fn new(db: Box<dyn DbCommit>) -> Self {
87        Self {
88            db,
89            docs: HashMap::new(),
90            entries: HashMap::new(),
91        }
92    }
93
94    /// Replace the current transaction with whatever transaction errored out last time.
95    pub fn load_from_errors(&mut self, errs: CommitErrors) {
96        self.docs = errs.docs;
97        self.entries = errs.entries;
98    }
99
100    /// Try to add a [`NewDocument`] to the DB. Can fail due to internal
101    /// database failure. It can also fail if the document's schema isn't in the
102    /// database, or if validation fails. On success, it returns a copy of the
103    /// document that will be committed.
104    pub fn add_new_doc(
105        &mut self,
106        doc: NewDocument,
107    ) -> DbResult<Result<Arc<Document>, SchemaError>> {
108        let (doc, (encoded, doc_hash)) = match doc.schema_hash() {
109            Some(schema) => {
110                let Some(schema) = self.db.schema_get(schema)? else {
111                    return Ok(Err(SchemaError::MissingSchema(schema.to_owned())));
112                };
113                let doc = match schema.validate_new_doc(doc) {
114                    Ok(doc) => doc,
115                    Err(e) => return Ok(Err(SchemaError::ValidationFail(e))),
116                };
117                let doc = Arc::new(doc);
118                (
119                    doc.clone(),
120                    EncodedDoc::from_doc(Some(schema.as_ref()), doc.as_ref().clone()),
121                )
122            }
123            None => {
124                let doc = match NoSchema::validate_new_doc(doc) {
125                    Ok(doc) => doc,
126                    Err(e) => return Ok(Err(SchemaError::ValidationFail(e))),
127                };
128                let doc = Arc::new(doc);
129                (
130                    doc.clone(),
131                    EncodedDoc::from_doc(None, doc.as_ref().clone()),
132                )
133            }
134        };
135        let encoded = Box::new(encoded);
136        match self.docs.entry(doc_hash) {
137            std::collections::hash_map::Entry::Occupied(mut e) => {
138                e.get_mut().add(encoded, doc.clone());
139            }
140            std::collections::hash_map::Entry::Vacant(v) => {
141                v.insert(DocChange::Add {
142                    doc: doc.clone(),
143                    encoded,
144                    weak_ref: HashSet::new(),
145                });
146            }
147        }
148        Ok(Ok(doc))
149    }
150
151    /// Try to add a [`Document`] to the DB. Can fail due to internal
152    /// database failure. It can also fail if the document's schema isn't in the
153    /// database.
154    pub fn add_doc(&mut self, doc: Arc<Document>) -> DbResult<Result<(), MissingSchema>> {
155        let (encoded, doc_hash) = match doc.schema_hash() {
156            Some(schema) => {
157                let Some(schema) = self.db.schema_get(schema)? else {
158                    return Ok(Err(MissingSchema(schema.to_owned())));
159                };
160                EncodedDoc::from_doc(Some(schema.as_ref()), doc.as_ref().clone())
161            }
162            None => EncodedDoc::from_doc(None, doc.as_ref().clone()),
163        };
164        let encoded = Box::new(encoded);
165        match self.docs.entry(doc_hash) {
166            std::collections::hash_map::Entry::Occupied(mut e) => {
167                e.get_mut().add(encoded, doc);
168            }
169            std::collections::hash_map::Entry::Vacant(v) => {
170                v.insert(DocChange::Add {
171                    encoded,
172                    doc,
173                    weak_ref: HashSet::new(),
174                });
175            }
176        }
177        Ok(Ok(()))
178    }
179
180    /// Try to add a [`NewEntry`] to the DB. Can fail due to internal database
181    /// failure, if the schema is missing from the database, or if any of the
182    /// documents needed for validation are missing from both the transaction
183    /// and the database.
184    pub fn add_new_entry(&mut self, entry: NewEntry) -> DbResult<Result<(), EntryError>> {
185        let Some(schema) = self.db.schema_get(entry.schema_hash())? else {
186            return Ok(Err(EntryError::MissingEntrySchema(entry.schema_hash().to_owned())));
187        };
188        let mut checklist = match schema.validate_new_entry(entry) {
189            Ok(list) => list,
190            Err(e) => return Ok(Err(EntryError::EntryValidationFail(e))),
191        };
192        for (link_hash, item) in checklist.iter() {
193            if let Some(DocChange::Add { doc, .. }) = self.docs.get(&link_hash) {
194                if let Err(e) = item.check(doc) {
195                    return Ok(Err(EntryError::DocValidationFail {
196                        doc: link_hash,
197                        source: e,
198                    }));
199                }
200                continue;
201            }
202            if let Some(doc) = self.db.doc_get(&link_hash)? {
203                if let Err(e) = item.check(&doc) {
204                    return Ok(Err(EntryError::DocValidationFail {
205                        doc: link_hash,
206                        source: e,
207                    }));
208                }
209                continue;
210            }
211            return Ok(Err(EntryError::MissingDoc(link_hash)));
212        }
213        let entry = checklist.complete().unwrap();
214        let (entry, e_ref) = EncodedEntry::from_entry(&schema, entry);
215        let entry = Box::new(entry);
216        match self.entries.entry(e_ref) {
217            std::collections::hash_map::Entry::Occupied(mut e) => {
218                e.get_mut().add(entry);
219            }
220            std::collections::hash_map::Entry::Vacant(v) => {
221                v.insert(EntryChange::Add {
222                    entry,
223                    ttl: None,
224                    policy: None,
225                });
226            }
227        }
228        Ok(Ok(()))
229    }
230
231    /// Try to add a [`Entry`] to the DB. Can fail due to internal database
232    /// failure, or if the schema is missing from the database.
233    pub fn add_entry(&mut self, entry: Entry) -> DbResult<Result<(), EntryError>> {
234        let Some(schema) = self.db.schema_get(entry.schema_hash())? else {
235            return Ok(Err(EntryError::MissingEntrySchema(entry.schema_hash().to_owned())));
236        };
237        let (entry, e_ref) = EncodedEntry::from_entry(&schema, entry);
238        let entry = Box::new(entry);
239        match self.entries.entry(e_ref) {
240            std::collections::hash_map::Entry::Occupied(mut e) => {
241                e.get_mut().add(entry);
242            }
243            std::collections::hash_map::Entry::Vacant(v) => {
244                v.insert(EntryChange::Add {
245                    entry,
246                    ttl: None,
247                    policy: None,
248                });
249            }
250        }
251        Ok(Ok(()))
252    }
253
254    /// Weaken/strengthen a reference for a Document.
255    pub fn set_weak_ref(&mut self, doc: &Hash, ref_hash: &Hash, weak: bool) {
256        match self.docs.entry(doc.to_owned()) {
257            std::collections::hash_map::Entry::Occupied(mut e) => match e.get_mut() {
258                DocChange::Add { weak_ref, .. } => {
259                    if weak {
260                        weak_ref.insert(ref_hash.to_owned());
261                    } else {
262                        weak_ref.remove(ref_hash);
263                    }
264                },
265                DocChange::Modify { weak_ref } => {
266                    weak_ref.insert(ref_hash.to_owned(), weak);
267                }
268            },
269            std::collections::hash_map::Entry::Vacant(v) => {
270                let mut weak_ref = HashMap::new();
271                weak_ref.insert(ref_hash.to_owned(), weak);
272                v.insert(DocChange::Modify { weak_ref });
273            }
274        }
275    }
276
277    /// Set or clear the time-to-live for an Entry.
278    pub fn set_ttl(&mut self, entry: &EntryRef, ttl: Option<Timestamp>) {
279        let set = ttl;
280        match self.entries.entry(entry.to_owned()) {
281            std::collections::hash_map::Entry::Occupied(mut e) => match e.get_mut() {
282                EntryChange::Add { ttl, .. } => {
283                    *ttl = set;
284                },
285                EntryChange::Modify { ttl, .. } => {
286                    *ttl = Some(set);
287                }
288                EntryChange::Delete => (),
289            },
290            std::collections::hash_map::Entry::Vacant(v) => {
291                v.insert(EntryChange::Modify { ttl: Some(set), policy: None });
292            }
293        }
294    }
295
296    /// Set or clear the policy for an Entry.
297    pub fn set_policy(&mut self, entry: &EntryRef, policy: Option<Policy>) {
298        let set = policy;
299        match self.entries.entry(entry.to_owned()) {
300            std::collections::hash_map::Entry::Occupied(mut e) => match e.get_mut() {
301                EntryChange::Add { policy, .. } => {
302                    *policy = set;
303                },
304                EntryChange::Modify { policy, .. } => {
305                    *policy = Some(set);
306                }
307                EntryChange::Delete => (),
308            },
309            std::collections::hash_map::Entry::Vacant(v) => {
310                v.insert(EntryChange::Modify { policy: Some(set), ttl: None });
311            }
312        }
313    }
314
315    /// Delete an entry from the database.
316    pub fn del_entry(&mut self, entry: &EntryRef) {
317        self.entries.insert(entry.to_owned(), EntryChange::Delete);
318    }
319
320    /// Commit this transaction to the database. This can fail due to internal
321    /// database errors, but it can also fail any of the various [`CommitError`]
322    /// reasons.
323    pub async fn commit(self) -> DbResult<Result<(), CommitErrors>> {
324        self.db.commit(self.docs, self.entries).await
325    }
326}
327
328/// A document, fully encoded and ready for the database.
329pub struct EncodedDoc {
330    schema: Option<Hash>,
331    data: Vec<u8>,
332    refs: Vec<Hash>,
333}
334
335impl EncodedDoc {
336    pub fn from_doc(schema: Option<&Schema>, doc: Document) -> (Self, Hash) {
337        let refs = doc.find_hashes();
338        let schema_hash = doc.schema_hash().cloned();
339        let (hash, data) = if let Some(schema) = schema {
340            schema.encode_doc(doc).unwrap()
341        } else {
342            NoSchema::encode_doc(doc).unwrap()
343        };
344        (
345            Self {
346                schema: schema_hash,
347                data,
348                refs,
349            },
350            hash,
351        )
352    }
353
354    /// The document's schema.
355    pub fn schema(&self) -> &Option<Hash> {
356        &self.schema
357    }
358
359    /// The raw encoded document.
360    pub fn data(&self) -> &[u8] {
361        &self.data
362    }
363
364    /// Get all hashes that were in the document
365    pub fn refs(&self) -> &[Hash] {
366        &self.refs
367    }
368}
369
370/// An entry, fully encoded and ready for the database.
371pub struct EncodedEntry {
372    data: Vec<u8>,
373    all_refs: Vec<Hash>,
374    required_refs: Vec<Hash>,
375}
376
377impl EncodedEntry {
378    pub fn from_entry(schema: &Schema, entry: Entry) -> (Self, EntryRef) {
379        let all_refs = entry.find_hashes();
380        let (e_ref, data, required_refs) = schema.encode_entry(entry).unwrap();
381        (
382            Self {
383                data,
384                all_refs,
385                required_refs,
386            },
387            e_ref,
388        )
389    }
390
391    /// Get the encoded entry data
392    pub fn data(&self) -> &[u8] {
393        &self.data
394    }
395
396    /// Get all hashes that were in the entry
397    pub fn all_refs(&self) -> &[Hash] {
398        &self.all_refs
399    }
400
401    /// Get the hashes that are required for validation
402    pub fn required_refs(&self) -> &[Hash] {
403        &self.required_refs
404    }
405}
406
407/// A change to a document in the database, consisting of either an Add
408/// operation or a Modify operation. Documents cannot be deleted directly;
409/// instead, they are dropped once all references to them are gone or have been
410/// weakened.
411pub enum DocChange {
412    /// Add a document to the DB
413    Add {
414        /// Document to add to the DB
415        encoded: Box<EncodedDoc>,
416        /// Actual document, unencoded so we can still check it as needed
417        doc: Arc<Document>,
418        /// Set of references to weaken
419        weak_ref: HashSet<Hash>,
420    },
421    /// Change the metadata of a document in the DB.
422    Modify {
423        /// Set to true to make a reference weak
424        weak_ref: HashMap<Hash, bool>,
425    },
426}
427
428impl DocChange {
429    fn add(&mut self, encoded: Box<EncodedDoc>, doc: Arc<Document>) {
430        if let DocChange::Modify { weak_ref } = self {
431            let weak_ref: HashSet<Hash> = weak_ref
432                .iter()
433                .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
434                .collect();
435            *self = DocChange::Add {
436                encoded,
437                doc,
438                weak_ref,
439            };
440        }
441    }
442}
443
444/// A change to an entry in the database, consisting of either an Add, Modify,
445/// or Delete operation.
446pub enum EntryChange {
447    Add {
448        entry: Box<EncodedEntry>,
449        ttl: Option<Timestamp>,
450        policy: Option<Policy>,
451    },
452    Modify {
453        ttl: Option<Option<Timestamp>>,
454        policy: Option<Option<Policy>>,
455    },
456    Delete,
457}
458
459impl EntryChange {
460    fn add(&mut self, entry: Box<EncodedEntry>) {
461        match self {
462            EntryChange::Modify { ttl, policy } => {
463                *self = EntryChange::Add {
464                    entry,
465                    ttl: ttl.unwrap_or_default(),
466                    policy: policy.clone().unwrap_or_default(),
467                };
468            }
469            EntryChange::Delete => {
470                *self = EntryChange::Add {
471                    entry,
472                    ttl: None,
473                    policy: None,
474                };
475            }
476            EntryChange::Add { .. } => (),
477        }
478    }
479}