1use either::Either;
15use std::{cell::RefCell, rc::Rc};
16use ufotofu::BufferedProducer;
17use willow_data_model::{ForgetEntryError, ForgetPayloadError, PayloadError, TrustedDecodable};
18
19use sled::{
20 transaction::{ConflictableTransactionError, TransactionError, TransactionalTree},
21 Db, Error as SledError, IVec, Result as SledResult, Transactional, Tree,
22};
23use ufotofu::{
24 consumer::IntoVec, producer::FromSlice, BulkConsumer, BulkProducer, Consumer, Producer,
25};
26use ufotofu_codec::{Decodable, DecodableSync, Encodable, EncodableKnownSize, EncodableSync};
27use ufotofu_codec_endian::U64BE;
28use willow_data_model::{
29 grouping::{Area, AreaSubspace},
30 AuthorisationToken, AuthorisedEntry, Component, Entry, EntryIngestionError,
31 EntryIngestionSuccess, EntryOrigin, EventSystem, LengthyAuthorisedEntry, NamespaceId, Path,
32 PayloadAppendError, PayloadAppendSuccess, PayloadDigest, QueryIgnoreParams, Store, StoreEvent,
33 SubspaceId,
34};
35
36pub struct StoreSimpleSled<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
38where
39 N: NamespaceId + EncodableKnownSize + Decodable,
40 S: SubspaceId,
41 PD: PayloadDigest,
42 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
43{
44 namespace_id: N,
45 db: Db,
46 event_system: Rc<RefCell<EventSystem<MCL, MCC, MPL, N, S, PD, AT, StoreSimpleSledError>>>,
47}
48
49const ENTRY_TREE_KEY: [u8; 1] = [0b0000_0000];
50const PAYLOAD_TREE_KEY: [u8; 1] = [0b0000_0001];
51const MISC_TREE_KEY: [u8; 1] = [0b0000_0010];
52
53const NAMESPACE_ID_KEY: [u8; 1] = [0b0000_0000];
54
55#[derive(Debug)]
57pub enum NewStoreSimpleSledError {
58 DbNotClean,
60 StoreError(StoreSimpleSledError),
61}
62
63#[derive(Debug)]
65pub enum ExistingStoreSimpleSledError {
66 MalformedDb,
68 StoreError(StoreSimpleSledError),
69}
70
71impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
72 StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>
73where
74 N: NamespaceId + EncodableKnownSize + Decodable + DecodableSync,
75 S: SubspaceId,
76 PD: PayloadDigest,
77 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
78{
79 pub fn new(namespace: &N, db: Db) -> Result<Self, NewStoreSimpleSledError>
81 where
82 N: NamespaceId + EncodableKnownSize + EncodableSync,
83 {
84 Self::new_with_event_queue_capacity(namespace, db, 1024) }
86
87 pub fn new_with_event_queue_capacity(
89 namespace: &N,
90 db: Db,
91 capacity: usize,
92 ) -> Result<Self, NewStoreSimpleSledError>
93 where
94 N: NamespaceId + EncodableKnownSize + EncodableSync,
95 {
96 let store = Self {
97 db,
98 namespace_id: namespace.clone(),
99 event_system: Rc::new(RefCell::new(EventSystem::new(capacity))),
100 };
101
102 let misc_tree = store.misc_tree()?;
103
104 if !misc_tree.is_empty() {
105 return Err(NewStoreSimpleSledError::DbNotClean);
106 }
107
108 let namespace_encoded = namespace.sync_encode_into_vec();
109
110 misc_tree.insert(NAMESPACE_ID_KEY, namespace_encoded)?;
111
112 Ok(store)
113 }
114
115 pub fn from_existing(db: Db) -> Result<Self, ExistingStoreSimpleSledError> {
117 Self::from_existing_with_event_queue_capacity(db, 1024) }
119
120 pub fn from_existing_with_event_queue_capacity(
122 db: Db,
123 capacity: usize,
124 ) -> Result<Self, ExistingStoreSimpleSledError> {
125 let misc_tree = db.open_tree(MISC_TREE_KEY)?;
126
127 let namespace_encoded = misc_tree
128 .get(NAMESPACE_ID_KEY)?
129 .ok_or(ExistingStoreSimpleSledError::MalformedDb)?;
130
131 let namespace_id = N::sync_decode_from_slice(&namespace_encoded)
132 .map_err(|_err| ExistingStoreSimpleSledError::MalformedDb)?;
133
134 Ok(Self {
135 namespace_id,
136 db,
137 event_system: Rc::new(RefCell::new(EventSystem::new(capacity))),
138 })
139 }
140
141 fn entry_tree(&self) -> SledResult<Tree> {
142 self.db.open_tree(ENTRY_TREE_KEY)
143 }
144
145 fn payload_tree(&self) -> SledResult<Tree> {
146 self.db.open_tree(PAYLOAD_TREE_KEY)
147 }
148
149 fn misc_tree(&self) -> SledResult<Tree> {
150 self.db.open_tree(MISC_TREE_KEY)
151 }
152
153 async fn is_prefixed_by_newer(
155 &self,
156 entry: &Entry<MCL, MCC, MPL, N, S, PD>,
157 ) -> Result<Option<AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>, StoreSimpleSledError>
158 where
159 S: SubspaceId + EncodableSync + EncodableKnownSize + Decodable,
160 PD: PayloadDigest + Decodable + EncodableSync + EncodableKnownSize,
161 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + TrustedDecodable,
162 S::ErrorReason: core::fmt::Debug,
163 PD::ErrorReason: core::fmt::Debug,
164 {
165 let tree = self.entry_tree()?;
169
170 let prefix = entry.subspace_id().sync_encode_into_vec();
171
172 for (key, value) in tree.scan_prefix(&prefix).flatten() {
173 let (other_subspace, other_path, other_timestamp) =
176 decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
177 let (payload_length, payload_digest, authorisation_token, _local_length) =
178 decode_entry_values(&value).await;
179
180 let other_entry = Entry::new(
181 self.namespace_id.clone(),
182 other_subspace,
183 other_path,
184 other_timestamp,
185 payload_length,
186 payload_digest,
187 );
188
189 if entry.path().is_prefixed_by(other_entry.path()) && other_entry.is_newer_than(entry) {
190 let authed =
191 unsafe { AuthorisedEntry::new_unchecked(other_entry, authorisation_token) };
192
193 return Ok(Some(authed));
194 }
195 }
196
197 Ok(None)
198 }
199
200 fn flush(&self) -> Result<(), StoreSimpleSledError> {
201 self.db.flush()?;
202
203 Ok(())
204 }
205
206 fn prefix_gt(
208 &self,
209 tree: &Tree,
210 prefix: &[u8],
211 ) -> Result<Option<(IVec, IVec)>, StoreSimpleSledError> {
212 if let Some((key, value)) = tree.scan_prefix(prefix).flatten().next() {
213 return Ok(Some((key, value)));
214 }
215
216 Ok(None)
217 }
218
219 pub fn clear(&self) -> Result<(), StoreSimpleSledError> {
221 self.db.clear()?;
222 self.flush()?;
223 Ok(())
224 }
225}
226
227#[derive(Debug, PartialEq)]
229pub enum StoreSimpleSledError {
230 Sled(SledError),
231 Transaction(TransactionError<()>),
232 ConflictableTransaction(ConflictableTransactionError<()>),
233}
234
235impl core::fmt::Display for StoreSimpleSledError {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 match self {
238 StoreSimpleSledError::Sled(error) => core::fmt::Display::fmt(error, f),
239 StoreSimpleSledError::Transaction(_) => {
240 write!(f, "sled transaction error occurred.")
241 }
242 StoreSimpleSledError::ConflictableTransaction(_) => {
243 write!(f, "sled conflictable transaction error occurred.")
244 }
245 }
246 }
247}
248
249impl core::error::Error for StoreSimpleSledError {}
250
251impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
252 Store<MCL, MCC, MPL, N, S, PD, AT> for StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>
253where
254 N: NamespaceId + EncodableKnownSize + DecodableSync,
255 S: SubspaceId + EncodableSync + EncodableKnownSize + Decodable,
256 PD: PayloadDigest + Encodable + EncodableSync + EncodableKnownSize + Decodable,
257 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + TrustedDecodable + Encodable,
258 S::ErrorReason: core::fmt::Debug,
259 PD::ErrorReason: core::fmt::Debug,
260{
261 type Error = StoreSimpleSledError;
262
263 fn namespace_id(&self) -> &N {
264 &self.namespace_id
265 }
266
267 async fn ingest_entry(
268 &self,
269 authorised_entry: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
270 prevent_pruning: bool,
271 origin: EntryOrigin,
272 ) -> Result<EntryIngestionSuccess<MCL, MCC, MPL, N, S, PD, AT>, EntryIngestionError<Self::Error>>
273 {
274 let (entry, token) = authorised_entry.into_parts();
275
276 if *entry.namespace_id() != self.namespace_id {
277 panic!(
278 "Store for {:?} tried to ingest entry of namespace {:?}",
279 self.namespace_id,
280 entry.namespace_id()
281 )
282 }
283
284 match self.is_prefixed_by_newer(&entry).await {
286 Ok(Some(newer)) => {
287 return Ok(EntryIngestionSuccess::Obsolete {
288 obsolete: unsafe { AuthorisedEntry::new_unchecked(entry, token) },
289 newer,
290 })
291 }
292 Err(err) => return Err(EntryIngestionError::OperationsError(err)),
293 Ok(None) => {
294 }
296 }
297
298 let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
299
300 let same_subspace_path_prefix_trailing_end =
301 encode_subspace_path_key(entry.subspace_id(), entry.path(), false).await;
302
303 let mut keys_to_prune: Vec<IVec> = Vec::new();
304
305 for (key, value) in entry_tree
306 .scan_prefix(&same_subspace_path_prefix_trailing_end)
307 .flatten()
308 {
309 let (other_subspace, other_path, other_timestamp) =
310 decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
311
312 let (
313 other_payload_length,
314 other_payload_digest,
315 _other_authorisation_token,
316 _other_local_length,
317 ) = decode_entry_values::<PD, AT>(&value).await;
318
319 let other_entry = Entry::new(
320 self.namespace_id.clone(),
321 other_subspace,
322 other_path,
323 other_timestamp,
324 other_payload_length,
325 other_payload_digest,
326 );
327
328 if other_entry.is_newer_than(&entry) {
329 continue;
330 }
331
332 if prevent_pruning {
335 return Err(EntryIngestionError::PruningPrevented);
336 }
337
338 keys_to_prune.push(key);
340 }
341
342 let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
343
344 let key = encode_entry_key(entry.subspace_id(), entry.path(), entry.timestamp()).await;
345
346 let value =
347 encode_entry_values(entry.payload_length(), entry.payload_digest(), &token, 0).await;
348
349 let mut entry_batch = sled::Batch::default();
350 let mut payload_batch = sled::Batch::default();
351
352 for key in keys_to_prune {
353 entry_batch.remove(&key);
354 payload_batch.remove(&key);
355 }
356 entry_batch.insert(key.clone(), value);
357
358 (&entry_tree, &payload_tree)
359 .transaction(
360 |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
361 (),
362 ConflictableTransactionError<()>,
363 > {
364 tx_entry.apply_batch(&entry_batch)?;
365 tx_payloads.apply_batch(&payload_batch)?;
366
367 Ok(())
368 },
369 )
370 .map_err(StoreSimpleSledError::from)?;
371
372 self.event_system
373 .borrow_mut()
374 .ingested_entry(AuthorisedEntry::new(entry, token).unwrap(), origin);
375
376 Ok(EntryIngestionSuccess::Success)
377 }
378
379 async fn append_payload<Producer, PayloadSourceError>(
380 &self,
381 subspace: &S,
382 path: &Path<MCL, MCC, MPL>,
383 expected_digest: Option<PD>,
384 payload_source: &mut Producer,
385 ) -> Result<PayloadAppendSuccess, PayloadAppendError<PayloadSourceError, Self::Error>>
386 where
387 Producer: BulkProducer<Item = u8, Error = PayloadSourceError>,
388 {
389 let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
390 let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
391
392 let exact_key = encode_subspace_path_key(subspace, path, true).await;
393
394 let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
395
396 match maybe_entry {
397 Some((entry_key, value)) => {
398 let (subspace, path, timestamp) =
399 decode_entry_key::<MCL, MCC, MPL, S>(&entry_key).await;
400 let (length, digest, auth_token, _local_length) =
401 decode_entry_values::<PD, AT>(&value).await;
402
403 if let Some(expected) = expected_digest {
404 if expected != digest {
405 return Err(PayloadAppendError::WrongEntry);
406 }
407 }
408
409 let payload_key = encode_subspace_path_key(&subspace, &path, false).await;
410
411 let existing_payload = payload_tree
412 .get(&payload_key)
413 .map_err(StoreSimpleSledError::from)?;
414
415 let prefix = if let Some(payload) = existing_payload {
416 payload
417 } else {
418 IVec::from(&[])
419 };
420
421 let mut payload: Vec<u8> = Vec::from(prefix.as_ref());
424 let mut received_payload_len = payload.len();
425 let mut hasher = PD::hasher();
426
427 PD::write(&mut hasher, &prefix);
429
430 loop {
431 if received_payload_len as u64 > length {
433 return Err(PayloadAppendError::TooManyBytes);
434 }
435
436 match payload_source.produce().await {
437 Ok(Either::Left(byte)) => {
438 payload.push(byte);
439 PD::write(&mut hasher, &[byte]);
440 received_payload_len += 1;
441 }
442 Ok(Either::Right(_)) => break,
443 Err(err) => {
444 let new_value = encode_entry_values(
445 length,
446 &digest,
447 &auth_token,
448 received_payload_len as u64,
449 )
450 .await;
451
452 let mut entry_batch = sled::Batch::default();
453 let mut payload_batch = sled::Batch::default();
454
455 entry_batch.insert(entry_key, new_value);
456 payload_batch.insert(payload_key, payload);
457
458 (&entry_tree, &payload_tree)
459 .transaction(
460 |(tx_entry, tx_payloads): &(
461 TransactionalTree,
462 TransactionalTree,
463 )|
464 -> Result<
465 (),
466 sled::transaction::ConflictableTransactionError<()>,
467 > {
468 tx_entry.apply_batch(&entry_batch)?;
469 tx_payloads.apply_batch(&payload_batch)?;
470
471 Ok(())
472 },
473 )
474 .map_err(StoreSimpleSledError::from)?;
475
476 let entry = Entry::new(
477 self.namespace_id.clone(),
478 subspace,
479 path,
480 timestamp,
481 length,
482 digest,
483 );
484
485 let authy_entry =
486 unsafe { AuthorisedEntry::new_unchecked(entry, auth_token) };
487
488 self.event_system.borrow_mut().appended_payload(
489 LengthyAuthorisedEntry::new(
490 authy_entry,
491 received_payload_len as u64,
492 ),
493 );
494
495 return Err(PayloadAppendError::SourceError {
496 source_error: err,
497 total_length_now_available: received_payload_len as u64,
498 });
499 }
500 }
501 }
502
503 let authed_entry = unsafe {
504 AuthorisedEntry::new_unchecked(
505 Entry::new(
506 self.namespace_id.clone(),
507 subspace,
508 path,
509 timestamp,
510 length,
511 digest,
512 ),
513 auth_token,
514 )
515 };
516
517 let lengthy_entry =
518 LengthyAuthorisedEntry::new(authed_entry, received_payload_len as u64);
519
520 let new_value = encode_entry_values(
521 length,
522 lengthy_entry.entry().entry().payload_digest(),
523 lengthy_entry.entry().token(),
524 received_payload_len as u64,
525 )
526 .await;
527
528 let mut entry_batch = sled::Batch::default();
529 let mut payload_batch = sled::Batch::default();
530
531 entry_batch.insert(entry_key, new_value);
532 payload_batch.insert(payload_key, payload);
533
534 if received_payload_len as u64 == length {
535 let computed_digest = PD::finish(&hasher);
536
537 if computed_digest != *lengthy_entry.entry().entry().payload_digest() {
538 return Err(PayloadAppendError::DigestMismatch);
539 }
540
541 (&entry_tree, &payload_tree)
542 .transaction(
543 |(tx_entry, tx_payloads): &(
544 TransactionalTree,
545 TransactionalTree,
546 )|
547 -> Result<
548 (),
549 sled::transaction::ConflictableTransactionError<
550 (),
551 >,
552 > {
553 tx_entry.apply_batch(&entry_batch)?;
554 tx_payloads.apply_batch(&payload_batch)?;
555 Ok(())
556 },
557 )
558 .map_err(|err| {
559 StoreSimpleSledError::from(err)
560 })?;
561
562 self.event_system
563 .borrow_mut()
564 .appended_payload(lengthy_entry);
565
566 Ok(PayloadAppendSuccess::Completed)
567 } else {
568 (&entry_tree, &payload_tree)
569 .transaction(
570 |(tx_entry, tx_payloads): &(
571 TransactionalTree,
572 TransactionalTree,
573 )|
574 -> Result<
575 (),
576 sled::transaction::ConflictableTransactionError<()>,
577 > {
578 tx_entry.apply_batch(&entry_batch)?;
579 tx_payloads.apply_batch(&payload_batch)?;
580 Ok(())
581 },
582 )
583 .map_err(|err| {
584 StoreSimpleSledError::from(err)
585 })?;
586
587 self.event_system
588 .borrow_mut()
589 .appended_payload(lengthy_entry);
590
591 Ok(PayloadAppendSuccess::Appended)
592 }
593 }
594 None => Err(PayloadAppendError::NoSuchEntry),
595 }
596 }
597
598 async fn forget_entry(
599 &self,
600 subspace_id: &S,
601 path: &willow_data_model::Path<MCL, MCC, MPL>,
602 expected_digest: Option<PD>,
603 ) -> Result<(), ForgetEntryError<Self::Error>> {
604 let exact_key = encode_subspace_path_key(subspace_id, path, true).await;
605
606 let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
607 let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
608
609 let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
610
611 if let Some((key, value)) = maybe_entry {
612 let (subspace_id, path, timestamp) = decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
613 let (length, digest, auth_token, local_length) =
614 decode_entry_values::<PD, AT>(&value).await;
615
616 if let Some(expected) = expected_digest {
617 if expected != digest {
618 return Err(ForgetEntryError::WrongEntry);
619 }
620 }
621
622 (&entry_tree, &payload_tree)
623 .transaction(
624 |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
625 (),
626 sled::transaction::ConflictableTransactionError<()>,
627 > {
628 tx_entry.remove(&key)?;
629 tx_payloads.remove(&key)?;
630
631 Ok(())
632 },
633 ) .map_err(StoreSimpleSledError::from)?;
634
635 let entry = Entry::new(
636 self.namespace_id.clone(),
637 subspace_id,
638 path,
639 timestamp,
640 length,
641 digest,
642 );
643
644 let authy_entry = unsafe { AuthorisedEntry::new_unchecked(entry, auth_token) };
646
647 self.event_system
648 .borrow_mut()
649 .forgot_entry(LengthyAuthorisedEntry::new(authy_entry, local_length));
650 }
651
652 Ok(())
653 }
654
655 async fn forget_area(
656 &self,
657 area: &Area<MCL, MCC, MPL, S>,
658 protected: Option<&Area<MCL, MCC, MPL, S>>,
659 ) -> Result<usize, Self::Error> {
660 let entry_tree = self.entry_tree()?;
661 let payload_tree = self.payload_tree()?;
662
663 let mut entry_batch = sled::Batch::default();
664 let mut payload_batch = sled::Batch::default();
665
666 let mut forgotten_count = 0;
667
668 let entry_iterator = match area.subspace() {
669 AreaSubspace::Any => entry_tree.iter(),
670 AreaSubspace::Id(subspace) => {
671 let matching_subspace_path =
672 encode_subspace_path_key(subspace, area.path(), false).await;
673
674 entry_tree.scan_prefix(&matching_subspace_path)
675 }
676 };
677
678 for (key, value) in entry_iterator.flatten() {
679 let (subspace, path, timestamp) = decode_entry_key(&key).await;
680 let (_length, _digest, _token, _local_length) =
681 decode_entry_values::<PD, AT>(&value).await;
682
683 let prefix_matches = if *area.subspace() == AreaSubspace::Any {
684 path.is_prefixed_by(area.path())
685 } else {
686 true
688 };
689
690 let timestamp_included = area.times().includes(×tamp);
691
692 let is_protected = match &protected {
693 Some(protected_area) => {
694 protected_area.subspace().includes(&subspace)
695 && protected_area.path().is_prefix_of(&path)
696 && protected_area.times().includes(×tamp)
697 }
698 None => false,
699 };
700
701 if !is_protected && prefix_matches && timestamp_included {
702 entry_batch.remove(&key);
704 payload_batch.remove(&key);
705
706 forgotten_count += 1;
707 }
708 }
709
710 (&entry_tree, &payload_tree)
711 .transaction(
712 |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
713 (),
714 sled::transaction::ConflictableTransactionError<()>,
715 > {
716 tx_entry.apply_batch(&entry_batch)?;
717 tx_payloads.apply_batch(&payload_batch)?;
718
719 Ok(())
720 },
721 )?;
722
723 self.event_system
724 .borrow_mut()
725 .forgot_area(area.clone(), protected.cloned());
726
727 Ok(forgotten_count)
728 }
729
730 async fn forget_payload(
731 &self,
732 subspace_id: &S,
733 path: &Path<MCL, MCC, MPL>,
734 expected_digest: Option<PD>,
735 ) -> Result<(), ForgetPayloadError<Self::Error>> {
736 let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
737
738 let payload_key = encode_subspace_path_key(subspace_id, path, false).await;
739
740 let maybe_payload = self.prefix_gt(&payload_tree, &payload_key)?;
741
742 let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
743
744 let entry_key_partial = encode_subspace_path_key(subspace_id, path, true).await;
745 let maybe_entry = self.prefix_gt(&entry_tree, &entry_key_partial)?;
746
747 match (maybe_entry, maybe_payload) {
748 (Some((entry_key, entry_value)), Some((payload_key, _payload_value))) => {
749 let (subspace, path, timestamp) =
750 decode_entry_key::<MCL, MCC, MPL, S>(&entry_key).await;
751 let (length, digest, auth_token, local_length) =
752 decode_entry_values::<PD, AT>(&entry_value).await;
753
754 if let Some(expected) = expected_digest {
755 if expected != digest {
756 return Err(ForgetPayloadError::WrongEntry);
757 }
758 }
759
760 let new_key_value = encode_entry_values(length, &digest, &auth_token, 0).await;
761
762 (&entry_tree, &payload_tree).transaction(
763 |(entry_tx, payload_tx): &(TransactionalTree, TransactionalTree)| -> Result<
764 (),
765 sled::transaction::ConflictableTransactionError<()>,
766 > {
767 payload_tx.remove(&payload_key)?;
768 entry_tx.insert(&entry_key, new_key_value.clone())?;
769
770 Ok(())
771 },
772 ).map_err(StoreSimpleSledError::from)?;
773
774 let entry = Entry::new(
775 self.namespace_id.clone(),
776 subspace,
777 path,
778 timestamp,
779 length,
780 digest,
781 );
782
783 let authy_entry = unsafe { AuthorisedEntry::new_unchecked(entry, auth_token) };
784
785 self.event_system
786 .borrow_mut()
787 .forgot_payload(LengthyAuthorisedEntry::new(authy_entry, local_length));
788
789 Ok(())
790 }
791 (Some((_entry_key, entry_value)), None) => {
792 if let Some(expected) = expected_digest {
793 let (_length, digest, _auth_token, _local_length) =
794 decode_entry_values::<PD, AT>(&entry_value).await;
795
796 if expected != digest {
797 return Err(ForgetPayloadError::WrongEntry);
798 }
799 }
800
801 Ok(())
802 },
803 (None, None) => Err(ForgetPayloadError::NoSuchEntry),
804 (None, Some(_)) => panic!("StoreSimpleSled is storing a payload with no corresponding entry, which indicates an implementation error!"),
805 }
806 }
807
808 async fn forget_area_payloads(
809 &self,
810 area: &Area<MCL, MCC, MPL, S>,
811 protected: Option<&Area<MCL, MCC, MPL, S>>,
812 ) -> Result<usize, Self::Error> {
813 let entry_tree = self.entry_tree()?;
814 let payload_tree = self.payload_tree()?;
815
816 let mut entry_batch = sled::Batch::default();
817 let mut payload_batch = sled::Batch::default();
818
819 let mut forgotten_count = 0;
820
821 let entry_iterator = match area.subspace() {
822 AreaSubspace::Any => entry_tree.iter(),
823 AreaSubspace::Id(subspace) => {
824 let matching_subspace_path =
825 encode_subspace_path_key(subspace, area.path(), false).await;
826
827 entry_tree.scan_prefix(&matching_subspace_path)
828 }
829 };
830
831 for (key, value) in entry_iterator.flatten() {
832 let (subspace, path, timestamp) = decode_entry_key(&key).await;
833 let (length, digest, token, _local_length) =
834 decode_entry_values::<PD, AT>(&value).await;
835
836 let prefix_matches = if *area.subspace() == AreaSubspace::Any {
837 path.is_prefixed_by(area.path())
838 } else {
839 true
841 };
842
843 let timestamp_included = area.times().includes(×tamp);
844
845 let is_protected = match &protected {
846 Some(protected_area) => {
847 protected_area.subspace().includes(&subspace)
848 && protected_area.path().is_prefix_of(&path)
849 && protected_area.times().includes(×tamp)
850 }
851 None => false,
852 };
853
854 if !is_protected && prefix_matches && timestamp_included {
855 let entry_values = encode_entry_values(length, &digest, &token, 0).await;
856
857 entry_batch.insert(&key, entry_values);
858 payload_batch.remove(&key);
859
860 forgotten_count += 1;
861 }
862 }
863
864 (&entry_tree, &payload_tree).transaction(
865 |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
866 (),
867 sled::transaction::ConflictableTransactionError<()>,
868 > {
869 tx_entry.apply_batch(&entry_batch)?;
870 tx_payloads.apply_batch(&payload_batch)?;
871
872 Ok(())
873 },
874 )?;
875
876 self.event_system
877 .borrow_mut()
878 .forgot_area(area.clone(), protected.cloned());
879
880 Ok(forgotten_count)
881 }
882
883 async fn flush(&self) -> Result<(), Self::Error> {
884 self.flush()
885 }
886
887 async fn payload(
888 &self,
889 subspace: &S,
890 path: &Path<MCL, MCC, MPL>,
891 expected_digest: Option<PD>,
892 ) -> Result<
893 Option<impl BulkProducer<Item = u8, Final = (), Error = Self::Error>>,
894 PayloadError<Self::Error>,
895 > {
896 let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
897 let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
898 let exact_key = encode_subspace_path_key(subspace, path, true).await;
899
900 let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
901 let maybe_payload = self.prefix_gt(&payload_tree, &exact_key)?;
902
903 match (maybe_entry, maybe_payload) {
904 (Some((_entry_key, entry_value)), Some((_payload_key, payload_value))) => {
905 let (_length, digest, _token, _local_length) =
906 decode_entry_values::<PD, AT>(&entry_value).await;
907
908 if let Some(expected) = expected_digest {
909 if expected != digest {
910 return Err(PayloadError::WrongEntry);
911 }
912 }
913
914 Ok(Some(PayloadProducer::new(payload_value)))
915 }
916 (Some((_entry_key, entry_value)), None) => {
917 let (_length, digest, _token, _local_length) =
919 decode_entry_values::<PD, AT>(&entry_value).await;
920
921 if let Some(expected) = expected_digest {
922 if expected != digest {
923 return Err(PayloadError::WrongEntry);
924 }
925 }
926
927 Ok(None)
928 }
929 (None, None) => Ok(None),
930 (None, Some(_)) => {
931 panic!("Holding a payload for which there is no corresponding entry, this is bad!")
932 }
933 }
934 }
935
936 async fn entry(
937 &self,
938 subspace_id: &S,
939 path: &Path<MCL, MCC, MPL>,
940 ignore: QueryIgnoreParams,
941 ) -> Result<
942 Option<willow_data_model::LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>,
943 Self::Error,
944 > {
945 let exact_key = encode_subspace_path_key(subspace_id, path, true).await;
946
947 let entry_tree = self.entry_tree()?;
948
949 let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
950
951 if let Some((key, value)) = maybe_entry {
952 let (subspace, path, timestamp) = decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
953 let (length, digest, token, local_length) = decode_entry_values::<PD, AT>(&value).await;
954
955 let entry = Entry::new(
956 self.namespace_id.clone(),
957 subspace,
958 path,
959 timestamp,
960 length,
961 digest,
962 );
963
964 let authed_entry = unsafe { AuthorisedEntry::new_unchecked(entry, token) };
965
966 let payload_is_empty_string = length == 0;
967 let is_incomplete = local_length < length;
968
969 if (ignore.ignore_incomplete_payloads && is_incomplete)
970 || (ignore.ignore_empty_payloads && payload_is_empty_string)
971 {
972 return Ok(None);
973 } else {
974 return Ok(Some(LengthyAuthorisedEntry::new(
975 authed_entry,
976 local_length,
977 )));
978 }
979 }
980
981 Ok(None)
982 }
983
984 async fn query_area(
985 &self,
986 area: &Area<MCL, MCC, MPL, S>,
987 ignore: QueryIgnoreParams,
988 ) -> Result<
989 impl Producer<Item = LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>, Final = ()>,
990 Self::Error,
991 > {
992 EntryProducer::new(self, area, ignore).await
993 }
994
995 async fn subscribe_area(
996 &self,
997 area: &Area<MCL, MCC, MPL, S>,
998 ignore: QueryIgnoreParams,
999 ) -> impl Producer<Item = StoreEvent<MCL, MCC, MPL, N, S, PD, AT>, Final = (), Error = Self::Error>
1000 {
1001 EventSystem::add_subscription(self.event_system.clone(), area.clone(), ignore)
1002 }
1003}
1004
1005async fn encode_subspace_path_key<
1007 const MCL: usize,
1008 const MCC: usize,
1009 const MPL: usize,
1010 S: SubspaceId + EncodableKnownSize + EncodableSync,
1011>(
1012 subspace: &S,
1013 path: &Path<MCL, MCC, MPL>,
1014 with_path_end: bool,
1015) -> Vec<u8> {
1016 let mut consumer: IntoVec<u8> = IntoVec::new();
1017
1018 subspace.encode(&mut consumer).await.unwrap();
1020
1021 for component in path.components() {
1022 for byte in component.as_ref() {
1023 if *byte == 0 {
1024 consumer.bulk_consume_full_slice(&[0, 2]).await.unwrap();
1026 } else {
1027 consumer.consume(*byte).await.unwrap();
1029 }
1030 }
1031
1032 consumer.bulk_consume_full_slice(&[0, 1]).await.unwrap();
1034 }
1035
1036 if with_path_end {
1038 consumer.bulk_consume_full_slice(&[0, 0]).await.unwrap();
1039 }
1040
1041 consumer.into_vec()
1044}
1045
1046async fn encode_entry_key<
1047 const MCL: usize,
1048 const MCC: usize,
1049 const MPL: usize,
1050 S: SubspaceId + EncodableKnownSize + EncodableSync,
1051>(
1052 subspace: &S,
1053 path: &Path<MCL, MCC, MPL>,
1054 timestamp: u64,
1055) -> Vec<u8> {
1056 let mut consumer: IntoVec<u8> = IntoVec::new();
1057
1058 subspace.encode(&mut consumer).await.unwrap();
1060
1061 for component in path.components() {
1062 for byte in component.as_ref() {
1063 if *byte == 0 {
1064 consumer.bulk_consume_full_slice(&[0, 2]).await.unwrap();
1066 } else {
1067 consumer.consume(*byte).await.unwrap();
1069 }
1070 }
1071
1072 consumer.bulk_consume_full_slice(&[0, 1]).await.unwrap();
1074 }
1075
1076 consumer.bulk_consume_full_slice(&[0, 0]).await.unwrap();
1078
1079 U64BE(timestamp).encode(&mut consumer).await.unwrap();
1081
1082 consumer.into_vec()
1083}
1084
1085async fn decode_entry_key<
1086 const MCL: usize,
1087 const MCC: usize,
1088 const MPL: usize,
1089 S: SubspaceId + Decodable,
1090>(
1091 encoded: &IVec,
1092) -> (S, Path<MCL, MCC, MPL>, u64)
1093where
1094 S::ErrorReason: core::fmt::Debug,
1095{
1096 let mut producer = FromSlice::new(encoded);
1097
1098 let subspace = S::decode(&mut producer).await.unwrap();
1099
1100 let mut components_vecs: Vec<Vec<u8>> = Vec::new();
1101
1102 while let Some(bytes) = component_bytes(&mut producer).await {
1103 components_vecs.push(bytes);
1104 }
1105
1106 let mut components = components_vecs
1107 .iter()
1108 .map(|bytes| Component::new(bytes).expect("Component was unexpectedly longer than MCL."));
1109
1110 let total_len = components.clone().fold(0, |acc, comp| acc + comp.len());
1111
1112 let path: Path<MCL, MCC, MPL> = Path::new_from_iter(total_len, &mut components).unwrap();
1113
1114 let timestamp = U64BE::decode(&mut producer).await.unwrap().0;
1115
1116 (subspace, path, timestamp)
1117}
1118
1119async fn component_bytes<P: Producer<Item = u8>>(producer: &mut P) -> Option<Vec<u8>>
1120where
1121 P::Error: core::fmt::Debug,
1122 P::Final: core::fmt::Debug,
1123{
1124 let mut vec: Vec<u8> = Vec::new();
1125 let mut previous_was_zero = false;
1126
1127 loop {
1128 match producer.produce().await {
1129 Ok(Either::Left(byte)) => {
1130 if !previous_was_zero && byte == 0 {
1131 previous_was_zero = true
1132 } else if previous_was_zero && byte == 2 {
1133 vec.push(0);
1136 previous_was_zero = false;
1137 } else if previous_was_zero && byte == 1 {
1138 return Some(vec);
1140 } else if previous_was_zero && byte == 0 {
1141 return None;
1143 } else {
1144 vec.push(byte);
1146 previous_was_zero = false;
1147 }
1148 }
1149 Ok(Either::Right(_)) => {
1150 if previous_was_zero {
1151 panic!("Unterminated escaped key!")
1152 }
1153
1154 return None;
1155 }
1156 Err(err) => panic!("Unexpected error: {:?}", err),
1157 }
1158 }
1159}
1160
1161async fn encode_entry_values<PD, AT>(
1162 payload_length: u64,
1163 payload_digest: &PD,
1164 auth_token: &AT,
1165 local_length: u64,
1166) -> Vec<u8>
1167where
1168 PD: Encodable,
1169 AT: Encodable,
1170{
1171 let mut consumer: IntoVec<u8> = IntoVec::new();
1172
1173 U64BE(payload_length).encode(&mut consumer).await.unwrap();
1174 payload_digest.encode(&mut consumer).await.unwrap();
1175 auth_token.encode(&mut consumer).await.unwrap();
1176 U64BE(local_length).encode(&mut consumer).await.unwrap();
1177
1178 consumer.into_vec()
1179}
1180
1181async fn decode_entry_values<PD, AT>(encoded: &IVec) -> (u64, PD, AT, u64)
1182where
1183 AT: TrustedDecodable,
1184 PD: Decodable,
1185 PD::ErrorReason: core::fmt::Debug,
1186{
1187 let mut producer = FromSlice::new(encoded);
1188
1189 let payload_length = U64BE::decode(&mut producer).await.unwrap().0;
1190 let payload_digest = PD::decode(&mut producer).await.unwrap();
1191 let auth_token = unsafe { AT::trusted_decode(&mut producer).await.unwrap() };
1192 let local_length = U64BE::decode(&mut producer).await.unwrap().0;
1193
1194 (payload_length, payload_digest, auth_token, local_length)
1195}
1196
1197impl From<SledError> for StoreSimpleSledError {
1198 fn from(value: SledError) -> Self {
1199 StoreSimpleSledError::Sled(value)
1200 }
1201}
1202
1203impl From<ConflictableTransactionError<()>> for StoreSimpleSledError {
1204 fn from(value: ConflictableTransactionError<()>) -> Self {
1205 StoreSimpleSledError::ConflictableTransaction(value)
1206 }
1207}
1208
1209impl From<TransactionError<()>> for StoreSimpleSledError {
1210 fn from(value: TransactionError<()>) -> Self {
1211 StoreSimpleSledError::Transaction(value)
1212 }
1213}
1214
1215impl From<SledError> for NewStoreSimpleSledError {
1216 fn from(value: SledError) -> Self {
1217 Self::StoreError(StoreSimpleSledError::from(value))
1218 }
1219}
1220
1221impl From<SledError> for ExistingStoreSimpleSledError {
1222 fn from(value: SledError) -> Self {
1223 Self::StoreError(StoreSimpleSledError::from(value))
1224 }
1225}
1226
1227impl From<StoreSimpleSledError> for EntryIngestionError<StoreSimpleSledError> {
1228 fn from(val: StoreSimpleSledError) -> Self {
1229 EntryIngestionError::OperationsError(val)
1230 }
1231}
1232
1233impl<PSE> From<StoreSimpleSledError> for PayloadAppendError<PSE, StoreSimpleSledError> {
1234 fn from(val: StoreSimpleSledError) -> Self {
1235 PayloadAppendError::OperationError(val)
1236 }
1237}
1238
1239impl From<StoreSimpleSledError> for ForgetEntryError<StoreSimpleSledError> {
1240 fn from(value: StoreSimpleSledError) -> Self {
1241 Self::OperationError(value)
1242 }
1243}
1244
1245impl From<StoreSimpleSledError> for ForgetPayloadError<StoreSimpleSledError> {
1246 fn from(value: StoreSimpleSledError) -> Self {
1247 Self::OperationError(value)
1248 }
1249}
1250
1251impl From<StoreSimpleSledError> for PayloadError<StoreSimpleSledError> {
1252 fn from(value: StoreSimpleSledError) -> Self {
1253 Self::OperationError(value)
1254 }
1255}
1256
1257pub struct PayloadProducer {
1259 produced: usize,
1260 ivec: IVec,
1261}
1262
1263impl PayloadProducer {
1264 fn new(ivec: IVec) -> Self {
1265 Self { produced: 0, ivec }
1266 }
1267}
1268
1269impl Producer for PayloadProducer {
1270 type Item = u8;
1271
1272 type Final = ();
1273
1274 type Error = StoreSimpleSledError;
1275
1276 async fn produce(&mut self) -> Result<Either<Self::Item, Self::Final>, Self::Error> {
1277 match self.produced.cmp(&self.ivec.len()) {
1278 std::cmp::Ordering::Less => {
1279 let byte = self.ivec[self.produced];
1280 Ok(Either::Left(byte))
1281 },
1282 std::cmp::Ordering::Equal => Ok(Either::Right(())),
1283 std::cmp::Ordering::Greater => unreachable!("You tried to produce more bytes than you could, but you claimed infallibity. You traitor. You fool."),
1284 }
1285 }
1286}
1287
1288impl BufferedProducer for PayloadProducer {
1289 async fn slurp(&mut self) -> Result<(), Self::Error> {
1290 Ok(())
1291 }
1292}
1293
1294impl BulkProducer for PayloadProducer {
1295 async fn expose_items<'a>(
1296 &'a mut self,
1297 ) -> Result<Either<&'a [Self::Item], Self::Final>, Self::Error>
1298 where
1299 Self::Item: 'a,
1300 {
1301 let slice = &self.ivec[self.produced..];
1302 if slice.is_empty() {
1303 Ok(Either::Right(()))
1304 } else {
1305 Ok(Either::Left(slice))
1306 }
1307 }
1308
1309 async fn consider_produced(&mut self, amount: usize) -> Result<(), Self::Error> {
1310 self.produced += amount;
1311
1312 Ok(())
1313 }
1314}
1315
1316pub struct EntryProducer<'store, const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
1318where
1319 N: NamespaceId + EncodableKnownSize + Decodable,
1320 S: SubspaceId,
1321 PD: PayloadDigest,
1322 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
1323{
1324 iter: sled::Iter,
1325 store: &'store StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>,
1326 ignore: QueryIgnoreParams,
1327 area: Area<MCL, MCC, MPL, S>,
1328}
1329
1330impl<'store, const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
1331 EntryProducer<'store, MCL, MCC, MPL, N, S, PD, AT>
1332where
1333 N: NamespaceId + EncodableKnownSize + DecodableSync,
1334 S: SubspaceId + EncodableKnownSize + EncodableSync,
1335 PD: PayloadDigest,
1336 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
1337{
1338 async fn new(
1339 store: &'store StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>,
1340 area: &Area<MCL, MCC, MPL, S>,
1341 ignore: QueryIgnoreParams,
1342 ) -> Result<Self, StoreSimpleSledError> {
1343 let entry_tree = store.entry_tree()?;
1344
1345 let entry_iterator = match area.subspace() {
1346 AreaSubspace::Any => entry_tree.iter(),
1347 AreaSubspace::Id(subspace) => {
1348 let matching_subspace_path =
1349 encode_subspace_path_key(subspace, area.path(), false).await;
1350
1351 entry_tree.scan_prefix(&matching_subspace_path)
1352 }
1353 };
1354
1355 Ok(Self {
1356 iter: entry_iterator,
1357 area: area.clone(),
1358 ignore,
1359 store,
1360 })
1361 }
1362}
1363
1364impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT> Producer
1365 for EntryProducer<'_, MCL, MCC, MPL, N, S, PD, AT>
1366where
1367 N: NamespaceId + EncodableKnownSize + Decodable,
1368 S: SubspaceId + Decodable + EncodableKnownSize + EncodableSync,
1369 PD: PayloadDigest + Decodable,
1370 AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + TrustedDecodable,
1371 S::ErrorReason: std::fmt::Debug,
1372 PD::ErrorReason: std::fmt::Debug,
1373{
1374 type Item = LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>;
1375
1376 type Final = ();
1377
1378 type Error = StoreSimpleSledError;
1379
1380 async fn produce(&mut self) -> Result<Either<Self::Item, Self::Final>, Self::Error> {
1381 loop {
1382 let result = self.iter.next();
1383
1384 match result {
1385 Some(Ok((key, value))) => {
1386 let (subspace, path, timestamp) =
1387 decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
1388 let (length, digest, token, local_length) =
1389 decode_entry_values::<PD, AT>(&value).await;
1390
1391 let entry = Entry::new(
1392 self.store.namespace_id.clone(),
1393 subspace,
1394 path,
1395 timestamp,
1396 length,
1397 digest,
1398 );
1399
1400 if !self.area.includes_entry(&entry) {
1401 continue;
1402 }
1403
1404 let authed_entry = unsafe { AuthorisedEntry::new_unchecked(entry, token) };
1405
1406 let is_empty_string = length == 0;
1407 let is_incomplete = local_length < length;
1408
1409 if (self.ignore.ignore_incomplete_payloads && is_incomplete)
1410 || (self.ignore.ignore_empty_payloads && is_empty_string)
1411 {
1412 continue;
1413 }
1414
1415 return Ok(Either::Left(LengthyAuthorisedEntry::new(
1416 authed_entry,
1417 local_length,
1418 )));
1419 }
1420 Some(Err(err)) => return Err(StoreSimpleSledError::from(err)),
1421 None => return Ok(Either::Right(())),
1422 }
1423 }
1424 }
1425}