1use std::{
9 collections::{HashMap, HashSet},
10 sync::Arc,
11};
12
13use fog_pack::{
14 document::{Document, NewDocument},
15 entry::{Entry, EntryRef, NewEntry},
16 error::Error as FogError,
17 schema::{NoSchema, Schema},
18 types::*,
19};
20use thiserror::Error;
21
22use crate::{DbCommit, DbResult, cert::Policy, };
23
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum CommitError {
26 MissingEntry(EntryRef),
28 MissingParent(EntryRef),
30 MissingDoc(Hash),
32 MissingDocRef { doc: Hash, target: Hash },
34 MissingSchema { doc: Hash, schema: Hash },
36}
37
38pub struct CommitErrors {
39 pub docs: HashMap<Hash, DocChange>,
40 pub entries: HashMap<EntryRef, EntryChange>,
41 pub errors: Vec<CommitError>,
42}
43
44pub struct Transaction {
46 db: Box<dyn DbCommit>,
47 docs: HashMap<Hash, DocChange>,
48 entries: HashMap<EntryRef, EntryChange>,
49}
50
51#[derive(Clone, Debug, Error)]
53pub enum SchemaError {
54 #[error("Missing schema {0}")]
55 MissingSchema(Hash),
56 #[error("Validation failed")]
57 ValidationFail(#[from] FogError),
58}
59
60#[derive(Clone, Debug)]
62pub struct MissingSchema(pub Hash);
63
64impl std::fmt::Display for MissingSchema {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.write_str("Missing schema {0}")
67 }
68}
69
70impl std::error::Error for MissingSchema {}
71
72#[derive(Clone, Debug, Error)]
74pub enum EntryError {
75 #[error("Entry needed missing schema {0}")]
76 MissingEntrySchema(Hash),
77 #[error("Entry Validation failed")]
78 EntryValidationFail(#[from] FogError),
79 #[error("Document validation failed within context of entry (doc = {doc})")]
80 DocValidationFail { doc: Hash, source: FogError },
81 #[error("Missing document {0}")]
82 MissingDoc(Hash),
83}
84
85impl Transaction {
86 pub fn new(db: Box<dyn DbCommit>) -> Self {
87 Self {
88 db,
89 docs: HashMap::new(),
90 entries: HashMap::new(),
91 }
92 }
93
94 pub fn load_from_errors(&mut self, errs: CommitErrors) {
96 self.docs = errs.docs;
97 self.entries = errs.entries;
98 }
99
100 pub fn add_new_doc(
105 &mut self,
106 doc: NewDocument,
107 ) -> DbResult<Result<Arc<Document>, SchemaError>> {
108 let (doc, (encoded, doc_hash)) = match doc.schema_hash() {
109 Some(schema) => {
110 let Some(schema) = self.db.schema_get(schema)? else {
111 return Ok(Err(SchemaError::MissingSchema(schema.to_owned())));
112 };
113 let doc = match schema.validate_new_doc(doc) {
114 Ok(doc) => doc,
115 Err(e) => return Ok(Err(SchemaError::ValidationFail(e))),
116 };
117 let doc = Arc::new(doc);
118 (
119 doc.clone(),
120 EncodedDoc::from_doc(Some(schema.as_ref()), doc.as_ref().clone()),
121 )
122 }
123 None => {
124 let doc = match NoSchema::validate_new_doc(doc) {
125 Ok(doc) => doc,
126 Err(e) => return Ok(Err(SchemaError::ValidationFail(e))),
127 };
128 let doc = Arc::new(doc);
129 (
130 doc.clone(),
131 EncodedDoc::from_doc(None, doc.as_ref().clone()),
132 )
133 }
134 };
135 let encoded = Box::new(encoded);
136 match self.docs.entry(doc_hash) {
137 std::collections::hash_map::Entry::Occupied(mut e) => {
138 e.get_mut().add(encoded, doc.clone());
139 }
140 std::collections::hash_map::Entry::Vacant(v) => {
141 v.insert(DocChange::Add {
142 doc: doc.clone(),
143 encoded,
144 weak_ref: HashSet::new(),
145 });
146 }
147 }
148 Ok(Ok(doc))
149 }
150
151 pub fn add_doc(&mut self, doc: Arc<Document>) -> DbResult<Result<(), MissingSchema>> {
155 let (encoded, doc_hash) = match doc.schema_hash() {
156 Some(schema) => {
157 let Some(schema) = self.db.schema_get(schema)? else {
158 return Ok(Err(MissingSchema(schema.to_owned())));
159 };
160 EncodedDoc::from_doc(Some(schema.as_ref()), doc.as_ref().clone())
161 }
162 None => EncodedDoc::from_doc(None, doc.as_ref().clone()),
163 };
164 let encoded = Box::new(encoded);
165 match self.docs.entry(doc_hash) {
166 std::collections::hash_map::Entry::Occupied(mut e) => {
167 e.get_mut().add(encoded, doc);
168 }
169 std::collections::hash_map::Entry::Vacant(v) => {
170 v.insert(DocChange::Add {
171 encoded,
172 doc,
173 weak_ref: HashSet::new(),
174 });
175 }
176 }
177 Ok(Ok(()))
178 }
179
180 pub fn add_new_entry(&mut self, entry: NewEntry) -> DbResult<Result<(), EntryError>> {
185 let Some(schema) = self.db.schema_get(entry.schema_hash())? else {
186 return Ok(Err(EntryError::MissingEntrySchema(entry.schema_hash().to_owned())));
187 };
188 let mut checklist = match schema.validate_new_entry(entry) {
189 Ok(list) => list,
190 Err(e) => return Ok(Err(EntryError::EntryValidationFail(e))),
191 };
192 for (link_hash, item) in checklist.iter() {
193 if let Some(DocChange::Add { doc, .. }) = self.docs.get(&link_hash) {
194 if let Err(e) = item.check(doc) {
195 return Ok(Err(EntryError::DocValidationFail {
196 doc: link_hash,
197 source: e,
198 }));
199 }
200 continue;
201 }
202 if let Some(doc) = self.db.doc_get(&link_hash)? {
203 if let Err(e) = item.check(&doc) {
204 return Ok(Err(EntryError::DocValidationFail {
205 doc: link_hash,
206 source: e,
207 }));
208 }
209 continue;
210 }
211 return Ok(Err(EntryError::MissingDoc(link_hash)));
212 }
213 let entry = checklist.complete().unwrap();
214 let (entry, e_ref) = EncodedEntry::from_entry(&schema, entry);
215 let entry = Box::new(entry);
216 match self.entries.entry(e_ref) {
217 std::collections::hash_map::Entry::Occupied(mut e) => {
218 e.get_mut().add(entry);
219 }
220 std::collections::hash_map::Entry::Vacant(v) => {
221 v.insert(EntryChange::Add {
222 entry,
223 ttl: None,
224 policy: None,
225 });
226 }
227 }
228 Ok(Ok(()))
229 }
230
231 pub fn add_entry(&mut self, entry: Entry) -> DbResult<Result<(), EntryError>> {
234 let Some(schema) = self.db.schema_get(entry.schema_hash())? else {
235 return Ok(Err(EntryError::MissingEntrySchema(entry.schema_hash().to_owned())));
236 };
237 let (entry, e_ref) = EncodedEntry::from_entry(&schema, entry);
238 let entry = Box::new(entry);
239 match self.entries.entry(e_ref) {
240 std::collections::hash_map::Entry::Occupied(mut e) => {
241 e.get_mut().add(entry);
242 }
243 std::collections::hash_map::Entry::Vacant(v) => {
244 v.insert(EntryChange::Add {
245 entry,
246 ttl: None,
247 policy: None,
248 });
249 }
250 }
251 Ok(Ok(()))
252 }
253
254 pub fn set_weak_ref(&mut self, doc: &Hash, ref_hash: &Hash, weak: bool) {
256 match self.docs.entry(doc.to_owned()) {
257 std::collections::hash_map::Entry::Occupied(mut e) => match e.get_mut() {
258 DocChange::Add { weak_ref, .. } => {
259 if weak {
260 weak_ref.insert(ref_hash.to_owned());
261 } else {
262 weak_ref.remove(ref_hash);
263 }
264 },
265 DocChange::Modify { weak_ref } => {
266 weak_ref.insert(ref_hash.to_owned(), weak);
267 }
268 },
269 std::collections::hash_map::Entry::Vacant(v) => {
270 let mut weak_ref = HashMap::new();
271 weak_ref.insert(ref_hash.to_owned(), weak);
272 v.insert(DocChange::Modify { weak_ref });
273 }
274 }
275 }
276
277 pub fn set_ttl(&mut self, entry: &EntryRef, ttl: Option<Timestamp>) {
279 let set = ttl;
280 match self.entries.entry(entry.to_owned()) {
281 std::collections::hash_map::Entry::Occupied(mut e) => match e.get_mut() {
282 EntryChange::Add { ttl, .. } => {
283 *ttl = set;
284 },
285 EntryChange::Modify { ttl, .. } => {
286 *ttl = Some(set);
287 }
288 EntryChange::Delete => (),
289 },
290 std::collections::hash_map::Entry::Vacant(v) => {
291 v.insert(EntryChange::Modify { ttl: Some(set), policy: None });
292 }
293 }
294 }
295
296 pub fn set_policy(&mut self, entry: &EntryRef, policy: Option<Policy>) {
298 let set = policy;
299 match self.entries.entry(entry.to_owned()) {
300 std::collections::hash_map::Entry::Occupied(mut e) => match e.get_mut() {
301 EntryChange::Add { policy, .. } => {
302 *policy = set;
303 },
304 EntryChange::Modify { policy, .. } => {
305 *policy = Some(set);
306 }
307 EntryChange::Delete => (),
308 },
309 std::collections::hash_map::Entry::Vacant(v) => {
310 v.insert(EntryChange::Modify { policy: Some(set), ttl: None });
311 }
312 }
313 }
314
315 pub fn del_entry(&mut self, entry: &EntryRef) {
317 self.entries.insert(entry.to_owned(), EntryChange::Delete);
318 }
319
320 pub async fn commit(self) -> DbResult<Result<(), CommitErrors>> {
324 self.db.commit(self.docs, self.entries).await
325 }
326}
327
328pub struct EncodedDoc {
330 schema: Option<Hash>,
331 data: Vec<u8>,
332 refs: Vec<Hash>,
333}
334
335impl EncodedDoc {
336 pub fn from_doc(schema: Option<&Schema>, doc: Document) -> (Self, Hash) {
337 let refs = doc.find_hashes();
338 let schema_hash = doc.schema_hash().cloned();
339 let (hash, data) = if let Some(schema) = schema {
340 schema.encode_doc(doc).unwrap()
341 } else {
342 NoSchema::encode_doc(doc).unwrap()
343 };
344 (
345 Self {
346 schema: schema_hash,
347 data,
348 refs,
349 },
350 hash,
351 )
352 }
353
354 pub fn schema(&self) -> &Option<Hash> {
356 &self.schema
357 }
358
359 pub fn data(&self) -> &[u8] {
361 &self.data
362 }
363
364 pub fn refs(&self) -> &[Hash] {
366 &self.refs
367 }
368}
369
370pub struct EncodedEntry {
372 data: Vec<u8>,
373 all_refs: Vec<Hash>,
374 required_refs: Vec<Hash>,
375}
376
377impl EncodedEntry {
378 pub fn from_entry(schema: &Schema, entry: Entry) -> (Self, EntryRef) {
379 let all_refs = entry.find_hashes();
380 let (e_ref, data, required_refs) = schema.encode_entry(entry).unwrap();
381 (
382 Self {
383 data,
384 all_refs,
385 required_refs,
386 },
387 e_ref,
388 )
389 }
390
391 pub fn data(&self) -> &[u8] {
393 &self.data
394 }
395
396 pub fn all_refs(&self) -> &[Hash] {
398 &self.all_refs
399 }
400
401 pub fn required_refs(&self) -> &[Hash] {
403 &self.required_refs
404 }
405}
406
407pub enum DocChange {
412 Add {
414 encoded: Box<EncodedDoc>,
416 doc: Arc<Document>,
418 weak_ref: HashSet<Hash>,
420 },
421 Modify {
423 weak_ref: HashMap<Hash, bool>,
425 },
426}
427
428impl DocChange {
429 fn add(&mut self, encoded: Box<EncodedDoc>, doc: Arc<Document>) {
430 if let DocChange::Modify { weak_ref } = self {
431 let weak_ref: HashSet<Hash> = weak_ref
432 .iter()
433 .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
434 .collect();
435 *self = DocChange::Add {
436 encoded,
437 doc,
438 weak_ref,
439 };
440 }
441 }
442}
443
444pub enum EntryChange {
447 Add {
448 entry: Box<EncodedEntry>,
449 ttl: Option<Timestamp>,
450 policy: Option<Policy>,
451 },
452 Modify {
453 ttl: Option<Option<Timestamp>>,
454 policy: Option<Option<Policy>>,
455 },
456 Delete,
457}
458
459impl EntryChange {
460 fn add(&mut self, entry: Box<EncodedEntry>) {
461 match self {
462 EntryChange::Modify { ttl, policy } => {
463 *self = EntryChange::Add {
464 entry,
465 ttl: ttl.unwrap_or_default(),
466 policy: policy.clone().unwrap_or_default(),
467 };
468 }
469 EntryChange::Delete => {
470 *self = EntryChange::Add {
471 entry,
472 ttl: None,
473 policy: None,
474 };
475 }
476 EntryChange::Add { .. } => (),
477 }
478 }
479}