willow_store_simple_sled/
lib.rs

1//! # willow-store-simple-sled
2//!
3//! Simple persistent storage for Willow data.
4//!
5//! - Implements [`willow_data_model::Store`].
6//! - *Simple*, hence it has a straightforward implementation without the use of fancy data structures.
7//! - Uses [sled](https://docs.rs/sled/latest/sled/) under the hood.
8//!
9//! # Performance considerations
10//!
11//! - Read and write performance should be adequate.
12//! - Loads entire payloads into memory all at once.
13
14use either::Either;
15use std::{cell::RefCell, rc::Rc};
16use ufotofu::BufferedProducer;
17use willow_data_model::{ForgetEntryError, ForgetPayloadError, PayloadError, TrustedDecodable};
18
19use sled::{
20    transaction::{ConflictableTransactionError, TransactionError, TransactionalTree},
21    Db, Error as SledError, IVec, Result as SledResult, Transactional, Tree,
22};
23use ufotofu::{
24    consumer::IntoVec, producer::FromSlice, BulkConsumer, BulkProducer, Consumer, Producer,
25};
26use ufotofu_codec::{Decodable, DecodableSync, Encodable, EncodableKnownSize, EncodableSync};
27use ufotofu_codec_endian::U64BE;
28use willow_data_model::{
29    grouping::{Area, AreaSubspace},
30    AuthorisationToken, AuthorisedEntry, Component, Entry, EntryIngestionError,
31    EntryIngestionSuccess, EntryOrigin, EventSystem, LengthyAuthorisedEntry, NamespaceId, Path,
32    PayloadAppendError, PayloadAppendSuccess, PayloadDigest, QueryIgnoreParams, Store, StoreEvent,
33    SubspaceId,
34};
35
36/// A simple, [sled](https://docs.rs/sled/latest/sled/)-powered Willow data [store](https://willowprotocol.org/specs/data-model/index.html#store) implementing the [willow_data_model::Store] trait.
37pub struct StoreSimpleSled<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
38where
39    N: NamespaceId + EncodableKnownSize + Decodable,
40    S: SubspaceId,
41    PD: PayloadDigest,
42    AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
43{
44    namespace_id: N,
45    db: Db,
46    event_system: Rc<RefCell<EventSystem<MCL, MCC, MPL, N, S, PD, AT, StoreSimpleSledError>>>,
47}
48
49const ENTRY_TREE_KEY: [u8; 1] = [0b0000_0000];
50const PAYLOAD_TREE_KEY: [u8; 1] = [0b0000_0001];
51const MISC_TREE_KEY: [u8; 1] = [0b0000_0010];
52
53const NAMESPACE_ID_KEY: [u8; 1] = [0b0000_0000];
54
55/// Returned when a store could not be instantiated from an empty [`sled::Db`].
56#[derive(Debug)]
57pub enum NewStoreSimpleSledError {
58    // The DB has already been configured for another namespace.
59    DbNotClean,
60    StoreError(StoreSimpleSledError),
61}
62
63/// Returned when a store could not be instantiated from an existing [`sled::Db`].
64#[derive(Debug)]
65pub enum ExistingStoreSimpleSledError {
66    // The DB is not correctly configured for use.
67    MalformedDb,
68    StoreError(StoreSimpleSledError),
69}
70
71impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
72    StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>
73where
74    N: NamespaceId + EncodableKnownSize + Decodable + DecodableSync,
75    S: SubspaceId,
76    PD: PayloadDigest,
77    AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
78{
79    /// Returns an empty [`StoreSimpleSled`], or an error if the database is already found to have data in it.
80    pub fn new(namespace: &N, db: Db) -> Result<Self, NewStoreSimpleSledError>
81    where
82        N: NamespaceId + EncodableKnownSize + EncodableSync,
83    {
84        Self::new_with_event_queue_capacity(namespace, db, 1024) // the 1024 is arbitrary, really
85    }
86
87    /// Returns an empty [`StoreSimpleSled`] with a given event queue capacity, or an error if the database is already found to have data in it.
88    pub fn new_with_event_queue_capacity(
89        namespace: &N,
90        db: Db,
91        capacity: usize,
92    ) -> Result<Self, NewStoreSimpleSledError>
93    where
94        N: NamespaceId + EncodableKnownSize + EncodableSync,
95    {
96        let store = Self {
97            db,
98            namespace_id: namespace.clone(),
99            event_system: Rc::new(RefCell::new(EventSystem::new(capacity))),
100        };
101
102        let misc_tree = store.misc_tree()?;
103
104        if !misc_tree.is_empty() {
105            return Err(NewStoreSimpleSledError::DbNotClean);
106        }
107
108        let namespace_encoded = namespace.sync_encode_into_vec();
109
110        misc_tree.insert(NAMESPACE_ID_KEY, namespace_encoded)?;
111
112        Ok(store)
113    }
114
115    /// Returns a [`StoreSimpleSled`] from a [`sled::Db`] already containing Willow data, or an error if the data is found to be malformed.
116    pub fn from_existing(db: Db) -> Result<Self, ExistingStoreSimpleSledError> {
117        Self::from_existing_with_event_queue_capacity(db, 1024) // the 1024 is arbitrary, really
118    }
119
120    /// Returns a [`StoreSimpleSled`] from a [`sled::Db`] already containing Willow data with a given event queue capacity, or an error if the data is found to be malformed.
121    pub fn from_existing_with_event_queue_capacity(
122        db: Db,
123        capacity: usize,
124    ) -> Result<Self, ExistingStoreSimpleSledError> {
125        let misc_tree = db.open_tree(MISC_TREE_KEY)?;
126
127        let namespace_encoded = misc_tree
128            .get(NAMESPACE_ID_KEY)?
129            .ok_or(ExistingStoreSimpleSledError::MalformedDb)?;
130
131        let namespace_id = N::sync_decode_from_slice(&namespace_encoded)
132            .map_err(|_err| ExistingStoreSimpleSledError::MalformedDb)?;
133
134        Ok(Self {
135            namespace_id,
136            db,
137            event_system: Rc::new(RefCell::new(EventSystem::new(capacity))),
138        })
139    }
140
141    fn entry_tree(&self) -> SledResult<Tree> {
142        self.db.open_tree(ENTRY_TREE_KEY)
143    }
144
145    fn payload_tree(&self) -> SledResult<Tree> {
146        self.db.open_tree(PAYLOAD_TREE_KEY)
147    }
148
149    fn misc_tree(&self) -> SledResult<Tree> {
150        self.db.open_tree(MISC_TREE_KEY)
151    }
152
153    /// Return whether this store contains entries with paths that are prefixes of the given path and newer than the given timestamp
154    async fn is_prefixed_by_newer(
155        &self,
156        entry: &Entry<MCL, MCC, MPL, N, S, PD>,
157    ) -> Result<Option<AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>, StoreSimpleSledError>
158    where
159        S: SubspaceId + EncodableSync + EncodableKnownSize + Decodable,
160        PD: PayloadDigest + Decodable + EncodableSync + EncodableKnownSize,
161        AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + TrustedDecodable,
162        S::ErrorReason: core::fmt::Debug,
163        PD::ErrorReason: core::fmt::Debug,
164    {
165        // Iterate from subspace, just linearly
166        // Create all prefixes of given path
167
168        let tree = self.entry_tree()?;
169
170        let prefix = entry.subspace_id().sync_encode_into_vec();
171
172        for (key, value) in tree.scan_prefix(&prefix).flatten() {
173            // println!("key: {:?}", key);
174
175            let (other_subspace, other_path, other_timestamp) =
176                decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
177            let (payload_length, payload_digest, authorisation_token, _local_length) =
178                decode_entry_values(&value).await;
179
180            let other_entry = Entry::new(
181                self.namespace_id.clone(),
182                other_subspace,
183                other_path,
184                other_timestamp,
185                payload_length,
186                payload_digest,
187            );
188
189            if entry.path().is_prefixed_by(other_entry.path()) && other_entry.is_newer_than(entry) {
190                let authed =
191                    unsafe { AuthorisedEntry::new_unchecked(other_entry, authorisation_token) };
192
193                return Ok(Some(authed));
194            }
195        }
196
197        Ok(None)
198    }
199
200    fn flush(&self) -> Result<(), StoreSimpleSledError> {
201        self.db.flush()?;
202
203        Ok(())
204    }
205
206    /// Returns the next key and value from the given tree after the provided key AND which is prefixed by the given key.
207    fn prefix_gt(
208        &self,
209        tree: &Tree,
210        prefix: &[u8],
211    ) -> Result<Option<(IVec, IVec)>, StoreSimpleSledError> {
212        if let Some((key, value)) = tree.scan_prefix(prefix).flatten().next() {
213            return Ok(Some((key, value)));
214        }
215
216        Ok(None)
217    }
218
219    /// Clear all data from the internal `sled::Db`
220    pub fn clear(&self) -> Result<(), StoreSimpleSledError> {
221        self.db.clear()?;
222        self.flush()?;
223        Ok(())
224    }
225}
226
227/// Returned when something goes wrong with the internal [`sled::Db`].
228#[derive(Debug, PartialEq)]
229pub enum StoreSimpleSledError {
230    Sled(SledError),
231    Transaction(TransactionError<()>),
232    ConflictableTransaction(ConflictableTransactionError<()>),
233}
234
235impl core::fmt::Display for StoreSimpleSledError {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        match self {
238            StoreSimpleSledError::Sled(error) => core::fmt::Display::fmt(error, f),
239            StoreSimpleSledError::Transaction(_) => {
240                write!(f, "sled transaction error occurred.")
241            }
242            StoreSimpleSledError::ConflictableTransaction(_) => {
243                write!(f, "sled conflictable transaction error occurred.")
244            }
245        }
246    }
247}
248
249impl core::error::Error for StoreSimpleSledError {}
250
251impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
252    Store<MCL, MCC, MPL, N, S, PD, AT> for StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>
253where
254    N: NamespaceId + EncodableKnownSize + DecodableSync,
255    S: SubspaceId + EncodableSync + EncodableKnownSize + Decodable,
256    PD: PayloadDigest + Encodable + EncodableSync + EncodableKnownSize + Decodable,
257    AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + TrustedDecodable + Encodable,
258    S::ErrorReason: core::fmt::Debug,
259    PD::ErrorReason: core::fmt::Debug,
260{
261    type Error = StoreSimpleSledError;
262
263    fn namespace_id(&self) -> &N {
264        &self.namespace_id
265    }
266
267    async fn ingest_entry(
268        &self,
269        authorised_entry: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
270        prevent_pruning: bool,
271        origin: EntryOrigin,
272    ) -> Result<EntryIngestionSuccess<MCL, MCC, MPL, N, S, PD, AT>, EntryIngestionError<Self::Error>>
273    {
274        let (entry, token) = authorised_entry.into_parts();
275
276        if *entry.namespace_id() != self.namespace_id {
277            panic!(
278                "Store for {:?} tried to ingest entry of namespace {:?}",
279                self.namespace_id,
280                entry.namespace_id()
281            )
282        }
283
284        // Check if we have any newer entries with this prefix.
285        match self.is_prefixed_by_newer(&entry).await {
286            Ok(Some(newer)) => {
287                return Ok(EntryIngestionSuccess::Obsolete {
288                    obsolete: unsafe { AuthorisedEntry::new_unchecked(entry, token) },
289                    newer,
290                })
291            }
292            Err(err) => return Err(EntryIngestionError::OperationsError(err)),
293            Ok(None) => {
294                // It's fine, continue.
295            }
296        }
297
298        let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
299
300        let same_subspace_path_prefix_trailing_end =
301            encode_subspace_path_key(entry.subspace_id(), entry.path(), false).await;
302
303        let mut keys_to_prune: Vec<IVec> = Vec::new();
304
305        for (key, value) in entry_tree
306            .scan_prefix(&same_subspace_path_prefix_trailing_end)
307            .flatten()
308        {
309            let (other_subspace, other_path, other_timestamp) =
310                decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
311
312            let (
313                other_payload_length,
314                other_payload_digest,
315                _other_authorisation_token,
316                _other_local_length,
317            ) = decode_entry_values::<PD, AT>(&value).await;
318
319            let other_entry = Entry::new(
320                self.namespace_id.clone(),
321                other_subspace,
322                other_path,
323                other_timestamp,
324                other_payload_length,
325                other_payload_digest,
326            );
327
328            if other_entry.is_newer_than(&entry) {
329                continue;
330            }
331
332            // This should be pruned!
333
334            if prevent_pruning {
335                return Err(EntryIngestionError::PruningPrevented);
336            }
337
338            // Prune it!
339            keys_to_prune.push(key);
340        }
341
342        let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
343
344        let key = encode_entry_key(entry.subspace_id(), entry.path(), entry.timestamp()).await;
345
346        let value =
347            encode_entry_values(entry.payload_length(), entry.payload_digest(), &token, 0).await;
348
349        let mut entry_batch = sled::Batch::default();
350        let mut payload_batch = sled::Batch::default();
351
352        for key in keys_to_prune {
353            entry_batch.remove(&key);
354            payload_batch.remove(&key);
355        }
356        entry_batch.insert(key.clone(), value);
357
358        (&entry_tree, &payload_tree)
359            .transaction(
360                |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
361                    (),
362                    ConflictableTransactionError<()>,
363                > {
364                    tx_entry.apply_batch(&entry_batch)?;
365                    tx_payloads.apply_batch(&payload_batch)?;
366
367                    Ok(())
368                },
369            )
370            .map_err(StoreSimpleSledError::from)?;
371
372        self.event_system
373            .borrow_mut()
374            .ingested_entry(AuthorisedEntry::new(entry, token).unwrap(), origin);
375
376        Ok(EntryIngestionSuccess::Success)
377    }
378
379    async fn append_payload<Producer, PayloadSourceError>(
380        &self,
381        subspace: &S,
382        path: &Path<MCL, MCC, MPL>,
383        expected_digest: Option<PD>,
384        payload_source: &mut Producer,
385    ) -> Result<PayloadAppendSuccess, PayloadAppendError<PayloadSourceError, Self::Error>>
386    where
387        Producer: BulkProducer<Item = u8, Error = PayloadSourceError>,
388    {
389        let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
390        let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
391
392        let exact_key = encode_subspace_path_key(subspace, path, true).await;
393
394        let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
395
396        match maybe_entry {
397            Some((entry_key, value)) => {
398                let (subspace, path, timestamp) =
399                    decode_entry_key::<MCL, MCC, MPL, S>(&entry_key).await;
400                let (length, digest, auth_token, _local_length) =
401                    decode_entry_values::<PD, AT>(&value).await;
402
403                if let Some(expected) = expected_digest {
404                    if expected != digest {
405                        return Err(PayloadAppendError::WrongEntry);
406                    }
407                }
408
409                let payload_key = encode_subspace_path_key(&subspace, &path, false).await;
410
411                let existing_payload = payload_tree
412                    .get(&payload_key)
413                    .map_err(StoreSimpleSledError::from)?;
414
415                let prefix = if let Some(payload) = existing_payload {
416                    payload
417                } else {
418                    IVec::from(&[])
419                };
420
421                // Append the payload
422
423                let mut payload: Vec<u8> = Vec::from(prefix.as_ref());
424                let mut received_payload_len = payload.len();
425                let mut hasher = PD::hasher();
426
427                // Make sure the prefix is hashed too.
428                PD::write(&mut hasher, &prefix);
429
430                loop {
431                    // 3. Too many bytes ingested? Error.
432                    if received_payload_len as u64 > length {
433                        return Err(PayloadAppendError::TooManyBytes);
434                    }
435
436                    match payload_source.produce().await {
437                        Ok(Either::Left(byte)) => {
438                            payload.push(byte);
439                            PD::write(&mut hasher, &[byte]);
440                            received_payload_len += 1;
441                        }
442                        Ok(Either::Right(_)) => break,
443                        Err(err) => {
444                            let new_value = encode_entry_values(
445                                length,
446                                &digest,
447                                &auth_token,
448                                received_payload_len as u64,
449                            )
450                            .await;
451
452                            let mut entry_batch = sled::Batch::default();
453                            let mut payload_batch = sled::Batch::default();
454
455                            entry_batch.insert(entry_key, new_value);
456                            payload_batch.insert(payload_key, payload);
457
458                            (&entry_tree, &payload_tree)
459                                .transaction(
460                                    |(tx_entry, tx_payloads): &(
461                                        TransactionalTree,
462                                        TransactionalTree,
463                                    )|
464                                     -> Result<
465                                        (),
466                                        sled::transaction::ConflictableTransactionError<()>,
467                                    > {
468                                        tx_entry.apply_batch(&entry_batch)?;
469                                        tx_payloads.apply_batch(&payload_batch)?;
470
471                                        Ok(())
472                                    },
473                                )
474                                .map_err(StoreSimpleSledError::from)?;
475
476                            let entry = Entry::new(
477                                self.namespace_id.clone(),
478                                subspace,
479                                path,
480                                timestamp,
481                                length,
482                                digest,
483                            );
484
485                            let authy_entry =
486                                unsafe { AuthorisedEntry::new_unchecked(entry, auth_token) };
487
488                            self.event_system.borrow_mut().appended_payload(
489                                LengthyAuthorisedEntry::new(
490                                    authy_entry,
491                                    received_payload_len as u64,
492                                ),
493                            );
494
495                            return Err(PayloadAppendError::SourceError {
496                                source_error: err,
497                                total_length_now_available: received_payload_len as u64,
498                            });
499                        }
500                    }
501                }
502
503                let authed_entry = unsafe {
504                    AuthorisedEntry::new_unchecked(
505                        Entry::new(
506                            self.namespace_id.clone(),
507                            subspace,
508                            path,
509                            timestamp,
510                            length,
511                            digest,
512                        ),
513                        auth_token,
514                    )
515                };
516
517                let lengthy_entry =
518                    LengthyAuthorisedEntry::new(authed_entry, received_payload_len as u64);
519
520                let new_value = encode_entry_values(
521                    length,
522                    lengthy_entry.entry().entry().payload_digest(),
523                    lengthy_entry.entry().token(),
524                    received_payload_len as u64,
525                )
526                .await;
527
528                let mut entry_batch = sled::Batch::default();
529                let mut payload_batch = sled::Batch::default();
530
531                entry_batch.insert(entry_key, new_value);
532                payload_batch.insert(payload_key, payload);
533
534                if received_payload_len as u64 == length {
535                    let computed_digest = PD::finish(&hasher);
536
537                    if computed_digest != *lengthy_entry.entry().entry().payload_digest() {
538                        return Err(PayloadAppendError::DigestMismatch);
539                    }
540
541                    (&entry_tree, &payload_tree)
542                    .transaction(
543                        |(tx_entry, tx_payloads): &(
544                            TransactionalTree,
545                            TransactionalTree,
546                        )|
547                         -> Result<
548                            (),
549                            sled::transaction::ConflictableTransactionError<
550                                (),
551                            >,
552                        > {
553                            tx_entry.apply_batch(&entry_batch)?;
554                            tx_payloads.apply_batch(&payload_batch)?;
555                            Ok(())
556                        },
557                    )
558                    .map_err(|err| {
559                        StoreSimpleSledError::from(err)
560                    })?;
561
562                    self.event_system
563                        .borrow_mut()
564                        .appended_payload(lengthy_entry);
565
566                    Ok(PayloadAppendSuccess::Completed)
567                } else {
568                    (&entry_tree, &payload_tree)
569                    .transaction(
570                        |(tx_entry, tx_payloads): &(
571                            TransactionalTree,
572                            TransactionalTree,
573                        )|
574                         -> Result<
575                            (),
576                            sled::transaction::ConflictableTransactionError<()>,
577                        > {
578                            tx_entry.apply_batch(&entry_batch)?;
579                            tx_payloads.apply_batch(&payload_batch)?;
580                            Ok(())
581                        },
582                    )
583                    .map_err(|err| {
584                        StoreSimpleSledError::from(err)
585                    })?;
586
587                    self.event_system
588                        .borrow_mut()
589                        .appended_payload(lengthy_entry);
590
591                    Ok(PayloadAppendSuccess::Appended)
592                }
593            }
594            None => Err(PayloadAppendError::NoSuchEntry),
595        }
596    }
597
598    async fn forget_entry(
599        &self,
600        subspace_id: &S,
601        path: &willow_data_model::Path<MCL, MCC, MPL>,
602        expected_digest: Option<PD>,
603    ) -> Result<(), ForgetEntryError<Self::Error>> {
604        let exact_key = encode_subspace_path_key(subspace_id, path, true).await;
605
606        let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
607        let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
608
609        let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
610
611        if let Some((key, value)) = maybe_entry {
612            let (subspace_id, path, timestamp) = decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
613            let (length, digest, auth_token, local_length) =
614                decode_entry_values::<PD, AT>(&value).await;
615
616            if let Some(expected) = expected_digest {
617                if expected != digest {
618                    return Err(ForgetEntryError::WrongEntry);
619                }
620            }
621
622            (&entry_tree, &payload_tree)
623                .transaction(
624                    |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
625                        (),
626                        sled::transaction::ConflictableTransactionError<()>,
627                    > {
628                        tx_entry.remove(&key)?;
629                        tx_payloads.remove(&key)?;
630
631                        Ok(())
632                    },
633                ) .map_err(StoreSimpleSledError::from)?;
634
635            let entry = Entry::new(
636                self.namespace_id.clone(),
637                subspace_id,
638                path,
639                timestamp,
640                length,
641                digest,
642            );
643
644            // We can do this because the token comes from within our store (where it was vetted prior to ingestion)
645            let authy_entry = unsafe { AuthorisedEntry::new_unchecked(entry, auth_token) };
646
647            self.event_system
648                .borrow_mut()
649                .forgot_entry(LengthyAuthorisedEntry::new(authy_entry, local_length));
650        }
651
652        Ok(())
653    }
654
655    async fn forget_area(
656        &self,
657        area: &Area<MCL, MCC, MPL, S>,
658        protected: Option<&Area<MCL, MCC, MPL, S>>,
659    ) -> Result<usize, Self::Error> {
660        let entry_tree = self.entry_tree()?;
661        let payload_tree = self.payload_tree()?;
662
663        let mut entry_batch = sled::Batch::default();
664        let mut payload_batch = sled::Batch::default();
665
666        let mut forgotten_count = 0;
667
668        let entry_iterator = match area.subspace() {
669            AreaSubspace::Any => entry_tree.iter(),
670            AreaSubspace::Id(subspace) => {
671                let matching_subspace_path =
672                    encode_subspace_path_key(subspace, area.path(), false).await;
673
674                entry_tree.scan_prefix(&matching_subspace_path)
675            }
676        };
677
678        for (key, value) in entry_iterator.flatten() {
679            let (subspace, path, timestamp) = decode_entry_key(&key).await;
680            let (_length, _digest, _token, _local_length) =
681                decode_entry_values::<PD, AT>(&value).await;
682
683            let prefix_matches = if *area.subspace() == AreaSubspace::Any {
684                path.is_prefixed_by(area.path())
685            } else {
686                // We know the path is a prefix because the iterator we used guarantees it.
687                true
688            };
689
690            let timestamp_included = area.times().includes(&timestamp);
691
692            let is_protected = match &protected {
693                Some(protected_area) => {
694                    protected_area.subspace().includes(&subspace)
695                        && protected_area.path().is_prefix_of(&path)
696                        && protected_area.times().includes(&timestamp)
697                }
698                None => false,
699            };
700
701            if !is_protected && prefix_matches && timestamp_included {
702                // FORGET IT
703                entry_batch.remove(&key);
704                payload_batch.remove(&key);
705
706                forgotten_count += 1;
707            }
708        }
709
710        (&entry_tree, &payload_tree)
711            .transaction(
712                |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
713                    (),
714                    sled::transaction::ConflictableTransactionError<()>,
715                > {
716                    tx_entry.apply_batch(&entry_batch)?;
717                    tx_payloads.apply_batch(&payload_batch)?;
718
719                    Ok(())
720                },
721            )?;
722
723        self.event_system
724            .borrow_mut()
725            .forgot_area(area.clone(), protected.cloned());
726
727        Ok(forgotten_count)
728    }
729
730    async fn forget_payload(
731        &self,
732        subspace_id: &S,
733        path: &Path<MCL, MCC, MPL>,
734        expected_digest: Option<PD>,
735    ) -> Result<(), ForgetPayloadError<Self::Error>> {
736        let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
737
738        let payload_key = encode_subspace_path_key(subspace_id, path, false).await;
739
740        let maybe_payload = self.prefix_gt(&payload_tree, &payload_key)?;
741
742        let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
743
744        let entry_key_partial = encode_subspace_path_key(subspace_id, path, true).await;
745        let maybe_entry = self.prefix_gt(&entry_tree, &entry_key_partial)?;
746
747        match (maybe_entry, maybe_payload) {
748            (Some((entry_key, entry_value)), Some((payload_key, _payload_value))) => {
749                let (subspace, path, timestamp) =
750                    decode_entry_key::<MCL, MCC, MPL, S>(&entry_key).await;
751                let (length, digest, auth_token, local_length) =
752                    decode_entry_values::<PD, AT>(&entry_value).await;
753
754                if let Some(expected) = expected_digest {
755                    if expected != digest {
756                        return Err(ForgetPayloadError::WrongEntry);
757                    }
758                }
759
760                let new_key_value = encode_entry_values(length, &digest, &auth_token, 0).await;
761
762                (&entry_tree, &payload_tree).transaction(
763                    |(entry_tx, payload_tx): &(TransactionalTree, TransactionalTree)| -> Result<
764                        (),
765                        sled::transaction::ConflictableTransactionError<()>,
766                    > {
767                        payload_tx.remove(&payload_key)?;
768                        entry_tx.insert(&entry_key, new_key_value.clone())?;
769
770                        Ok(())
771                    },
772                ).map_err(StoreSimpleSledError::from)?;
773
774                let entry = Entry::new(
775                    self.namespace_id.clone(),
776                    subspace,
777                    path,
778                    timestamp,
779                    length,
780                    digest,
781                );
782
783                let authy_entry = unsafe { AuthorisedEntry::new_unchecked(entry, auth_token) };
784
785                self.event_system
786                    .borrow_mut()
787                    .forgot_payload(LengthyAuthorisedEntry::new(authy_entry, local_length));
788
789                Ok(())
790            }
791            (Some((_entry_key, entry_value)), None) => {
792                if let Some(expected) = expected_digest {
793                    let (_length, digest, _auth_token, _local_length) =
794                    decode_entry_values::<PD, AT>(&entry_value).await;
795
796                    if expected != digest {
797                        return Err(ForgetPayloadError::WrongEntry);
798                    }
799                }
800
801                Ok(())
802            },
803            (None, None) => Err(ForgetPayloadError::NoSuchEntry),
804            (None, Some(_)) => panic!("StoreSimpleSled is storing a payload with no corresponding entry, which indicates an implementation error!"),
805        }
806    }
807
808    async fn forget_area_payloads(
809        &self,
810        area: &Area<MCL, MCC, MPL, S>,
811        protected: Option<&Area<MCL, MCC, MPL, S>>,
812    ) -> Result<usize, Self::Error> {
813        let entry_tree = self.entry_tree()?;
814        let payload_tree = self.payload_tree()?;
815
816        let mut entry_batch = sled::Batch::default();
817        let mut payload_batch = sled::Batch::default();
818
819        let mut forgotten_count = 0;
820
821        let entry_iterator = match area.subspace() {
822            AreaSubspace::Any => entry_tree.iter(),
823            AreaSubspace::Id(subspace) => {
824                let matching_subspace_path =
825                    encode_subspace_path_key(subspace, area.path(), false).await;
826
827                entry_tree.scan_prefix(&matching_subspace_path)
828            }
829        };
830
831        for (key, value) in entry_iterator.flatten() {
832            let (subspace, path, timestamp) = decode_entry_key(&key).await;
833            let (length, digest, token, _local_length) =
834                decode_entry_values::<PD, AT>(&value).await;
835
836            let prefix_matches = if *area.subspace() == AreaSubspace::Any {
837                path.is_prefixed_by(area.path())
838            } else {
839                // We know the path is a prefix because the iterator we used guarantees it.
840                true
841            };
842
843            let timestamp_included = area.times().includes(&timestamp);
844
845            let is_protected = match &protected {
846                Some(protected_area) => {
847                    protected_area.subspace().includes(&subspace)
848                        && protected_area.path().is_prefix_of(&path)
849                        && protected_area.times().includes(&timestamp)
850                }
851                None => false,
852            };
853
854            if !is_protected && prefix_matches && timestamp_included {
855                let entry_values = encode_entry_values(length, &digest, &token, 0).await;
856
857                entry_batch.insert(&key, entry_values);
858                payload_batch.remove(&key);
859
860                forgotten_count += 1;
861            }
862        }
863
864        (&entry_tree, &payload_tree).transaction(
865            |(tx_entry, tx_payloads): &(TransactionalTree, TransactionalTree)| -> Result<
866                (),
867                sled::transaction::ConflictableTransactionError<()>,
868            > {
869                tx_entry.apply_batch(&entry_batch)?;
870                tx_payloads.apply_batch(&payload_batch)?;
871
872                Ok(())
873            },
874        )?;
875
876        self.event_system
877            .borrow_mut()
878            .forgot_area(area.clone(), protected.cloned());
879
880        Ok(forgotten_count)
881    }
882
883    async fn flush(&self) -> Result<(), Self::Error> {
884        self.flush()
885    }
886
887    async fn payload(
888        &self,
889        subspace: &S,
890        path: &Path<MCL, MCC, MPL>,
891        expected_digest: Option<PD>,
892    ) -> Result<
893        Option<impl BulkProducer<Item = u8, Final = (), Error = Self::Error>>,
894        PayloadError<Self::Error>,
895    > {
896        let entry_tree = self.entry_tree().map_err(StoreSimpleSledError::from)?;
897        let payload_tree = self.payload_tree().map_err(StoreSimpleSledError::from)?;
898        let exact_key = encode_subspace_path_key(subspace, path, true).await;
899
900        let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
901        let maybe_payload = self.prefix_gt(&payload_tree, &exact_key)?;
902
903        match (maybe_entry, maybe_payload) {
904            (Some((_entry_key, entry_value)), Some((_payload_key, payload_value))) => {
905                let (_length, digest, _token, _local_length) =
906                    decode_entry_values::<PD, AT>(&entry_value).await;
907
908                if let Some(expected) = expected_digest {
909                    if expected != digest {
910                        return Err(PayloadError::WrongEntry);
911                    }
912                }
913
914                Ok(Some(PayloadProducer::new(payload_value)))
915            }
916            (Some((_entry_key, entry_value)), None) => {
917                // check expected digest.
918                let (_length, digest, _token, _local_length) =
919                    decode_entry_values::<PD, AT>(&entry_value).await;
920
921                if let Some(expected) = expected_digest {
922                    if expected != digest {
923                        return Err(PayloadError::WrongEntry);
924                    }
925                }
926
927                Ok(None)
928            }
929            (None, None) => Ok(None),
930            (None, Some(_)) => {
931                panic!("Holding a payload for which there is no corresponding entry, this is bad!")
932            }
933        }
934    }
935
936    async fn entry(
937        &self,
938        subspace_id: &S,
939        path: &Path<MCL, MCC, MPL>,
940        ignore: QueryIgnoreParams,
941    ) -> Result<
942        Option<willow_data_model::LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>,
943        Self::Error,
944    > {
945        let exact_key = encode_subspace_path_key(subspace_id, path, true).await;
946
947        let entry_tree = self.entry_tree()?;
948
949        let maybe_entry = self.prefix_gt(&entry_tree, &exact_key)?;
950
951        if let Some((key, value)) = maybe_entry {
952            let (subspace, path, timestamp) = decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
953            let (length, digest, token, local_length) = decode_entry_values::<PD, AT>(&value).await;
954
955            let entry = Entry::new(
956                self.namespace_id.clone(),
957                subspace,
958                path,
959                timestamp,
960                length,
961                digest,
962            );
963
964            let authed_entry = unsafe { AuthorisedEntry::new_unchecked(entry, token) };
965
966            let payload_is_empty_string = length == 0;
967            let is_incomplete = local_length < length;
968
969            if (ignore.ignore_incomplete_payloads && is_incomplete)
970                || (ignore.ignore_empty_payloads && payload_is_empty_string)
971            {
972                return Ok(None);
973            } else {
974                return Ok(Some(LengthyAuthorisedEntry::new(
975                    authed_entry,
976                    local_length,
977                )));
978            }
979        }
980
981        Ok(None)
982    }
983
984    async fn query_area(
985        &self,
986        area: &Area<MCL, MCC, MPL, S>,
987        ignore: QueryIgnoreParams,
988    ) -> Result<
989        impl Producer<Item = LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>, Final = ()>,
990        Self::Error,
991    > {
992        EntryProducer::new(self, area, ignore).await
993    }
994
995    async fn subscribe_area(
996        &self,
997        area: &Area<MCL, MCC, MPL, S>,
998        ignore: QueryIgnoreParams,
999    ) -> impl Producer<Item = StoreEvent<MCL, MCC, MPL, N, S, PD, AT>, Final = (), Error = Self::Error>
1000    {
1001        EventSystem::add_subscription(self.event_system.clone(), area.clone(), ignore)
1002    }
1003}
1004
1005/** Encode the key for a subspace and path **without** the timestamp. */
1006async fn encode_subspace_path_key<
1007    const MCL: usize,
1008    const MCC: usize,
1009    const MPL: usize,
1010    S: SubspaceId + EncodableKnownSize + EncodableSync,
1011>(
1012    subspace: &S,
1013    path: &Path<MCL, MCC, MPL>,
1014    with_path_end: bool,
1015) -> Vec<u8> {
1016    let mut consumer: IntoVec<u8> = IntoVec::new();
1017
1018    // Unwrap because IntoVec should not fail.
1019    subspace.encode(&mut consumer).await.unwrap();
1020
1021    for component in path.components() {
1022        for byte in component.as_ref() {
1023            if *byte == 0 {
1024                // Unwrap because IntoVec should not fail.
1025                consumer.bulk_consume_full_slice(&[0, 2]).await.unwrap();
1026            } else {
1027                // Unwrap because IntoVec should not fail.
1028                consumer.consume(*byte).await.unwrap();
1029            }
1030        }
1031
1032        // Unwrap because IntoVec should not fail.
1033        consumer.bulk_consume_full_slice(&[0, 1]).await.unwrap();
1034    }
1035
1036    // Unwrap because IntoVec should not fail.
1037    if with_path_end {
1038        consumer.bulk_consume_full_slice(&[0, 0]).await.unwrap();
1039    }
1040
1041    // No timestamp here!
1042
1043    consumer.into_vec()
1044}
1045
1046async fn encode_entry_key<
1047    const MCL: usize,
1048    const MCC: usize,
1049    const MPL: usize,
1050    S: SubspaceId + EncodableKnownSize + EncodableSync,
1051>(
1052    subspace: &S,
1053    path: &Path<MCL, MCC, MPL>,
1054    timestamp: u64,
1055) -> Vec<u8> {
1056    let mut consumer: IntoVec<u8> = IntoVec::new();
1057
1058    // Unwrap because IntoVec should not fail.
1059    subspace.encode(&mut consumer).await.unwrap();
1060
1061    for component in path.components() {
1062        for byte in component.as_ref() {
1063            if *byte == 0 {
1064                // Unwrap because IntoVec should not fail.
1065                consumer.bulk_consume_full_slice(&[0, 2]).await.unwrap();
1066            } else {
1067                // Unwrap because IntoVec should not fail.
1068                consumer.consume(*byte).await.unwrap();
1069            }
1070        }
1071
1072        // Unwrap because IntoVec should not fail.
1073        consumer.bulk_consume_full_slice(&[0, 1]).await.unwrap();
1074    }
1075
1076    // Unwrap because IntoVec should not fail.
1077    consumer.bulk_consume_full_slice(&[0, 0]).await.unwrap();
1078
1079    // Unwrap because IntoVec should not fail.
1080    U64BE(timestamp).encode(&mut consumer).await.unwrap();
1081
1082    consumer.into_vec()
1083}
1084
1085async fn decode_entry_key<
1086    const MCL: usize,
1087    const MCC: usize,
1088    const MPL: usize,
1089    S: SubspaceId + Decodable,
1090>(
1091    encoded: &IVec,
1092) -> (S, Path<MCL, MCC, MPL>, u64)
1093where
1094    S::ErrorReason: core::fmt::Debug,
1095{
1096    let mut producer = FromSlice::new(encoded);
1097
1098    let subspace = S::decode(&mut producer).await.unwrap();
1099
1100    let mut components_vecs: Vec<Vec<u8>> = Vec::new();
1101
1102    while let Some(bytes) = component_bytes(&mut producer).await {
1103        components_vecs.push(bytes);
1104    }
1105
1106    let mut components = components_vecs
1107        .iter()
1108        .map(|bytes| Component::new(bytes).expect("Component was unexpectedly longer than MCL."));
1109
1110    let total_len = components.clone().fold(0, |acc, comp| acc + comp.len());
1111
1112    let path: Path<MCL, MCC, MPL> = Path::new_from_iter(total_len, &mut components).unwrap();
1113
1114    let timestamp = U64BE::decode(&mut producer).await.unwrap().0;
1115
1116    (subspace, path, timestamp)
1117}
1118
1119async fn component_bytes<P: Producer<Item = u8>>(producer: &mut P) -> Option<Vec<u8>>
1120where
1121    P::Error: core::fmt::Debug,
1122    P::Final: core::fmt::Debug,
1123{
1124    let mut vec: Vec<u8> = Vec::new();
1125    let mut previous_was_zero = false;
1126
1127    loop {
1128        match producer.produce().await {
1129            Ok(Either::Left(byte)) => {
1130                if !previous_was_zero && byte == 0 {
1131                    previous_was_zero = true
1132                } else if previous_was_zero && byte == 2 {
1133                    // Append a zero.
1134
1135                    vec.push(0);
1136                    previous_was_zero = false;
1137                } else if previous_was_zero && byte == 1 {
1138                    // That's the end of this component..
1139                    return Some(vec);
1140                } else if previous_was_zero && byte == 0 {
1141                    // That's the end of the path.
1142                    return None;
1143                } else {
1144                    // Append to the component.
1145                    vec.push(byte);
1146                    previous_was_zero = false;
1147                }
1148            }
1149            Ok(Either::Right(_)) => {
1150                if previous_was_zero {
1151                    panic!("Unterminated escaped key!")
1152                }
1153
1154                return None;
1155            }
1156            Err(err) => panic!("Unexpected error: {:?}", err),
1157        }
1158    }
1159}
1160
1161async fn encode_entry_values<PD, AT>(
1162    payload_length: u64,
1163    payload_digest: &PD,
1164    auth_token: &AT,
1165    local_length: u64,
1166) -> Vec<u8>
1167where
1168    PD: Encodable,
1169    AT: Encodable,
1170{
1171    let mut consumer: IntoVec<u8> = IntoVec::new();
1172
1173    U64BE(payload_length).encode(&mut consumer).await.unwrap();
1174    payload_digest.encode(&mut consumer).await.unwrap();
1175    auth_token.encode(&mut consumer).await.unwrap();
1176    U64BE(local_length).encode(&mut consumer).await.unwrap();
1177
1178    consumer.into_vec()
1179}
1180
1181async fn decode_entry_values<PD, AT>(encoded: &IVec) -> (u64, PD, AT, u64)
1182where
1183    AT: TrustedDecodable,
1184    PD: Decodable,
1185    PD::ErrorReason: core::fmt::Debug,
1186{
1187    let mut producer = FromSlice::new(encoded);
1188
1189    let payload_length = U64BE::decode(&mut producer).await.unwrap().0;
1190    let payload_digest = PD::decode(&mut producer).await.unwrap();
1191    let auth_token = unsafe { AT::trusted_decode(&mut producer).await.unwrap() };
1192    let local_length = U64BE::decode(&mut producer).await.unwrap().0;
1193
1194    (payload_length, payload_digest, auth_token, local_length)
1195}
1196
1197impl From<SledError> for StoreSimpleSledError {
1198    fn from(value: SledError) -> Self {
1199        StoreSimpleSledError::Sled(value)
1200    }
1201}
1202
1203impl From<ConflictableTransactionError<()>> for StoreSimpleSledError {
1204    fn from(value: ConflictableTransactionError<()>) -> Self {
1205        StoreSimpleSledError::ConflictableTransaction(value)
1206    }
1207}
1208
1209impl From<TransactionError<()>> for StoreSimpleSledError {
1210    fn from(value: TransactionError<()>) -> Self {
1211        StoreSimpleSledError::Transaction(value)
1212    }
1213}
1214
1215impl From<SledError> for NewStoreSimpleSledError {
1216    fn from(value: SledError) -> Self {
1217        Self::StoreError(StoreSimpleSledError::from(value))
1218    }
1219}
1220
1221impl From<SledError> for ExistingStoreSimpleSledError {
1222    fn from(value: SledError) -> Self {
1223        Self::StoreError(StoreSimpleSledError::from(value))
1224    }
1225}
1226
1227impl From<StoreSimpleSledError> for EntryIngestionError<StoreSimpleSledError> {
1228    fn from(val: StoreSimpleSledError) -> Self {
1229        EntryIngestionError::OperationsError(val)
1230    }
1231}
1232
1233impl<PSE> From<StoreSimpleSledError> for PayloadAppendError<PSE, StoreSimpleSledError> {
1234    fn from(val: StoreSimpleSledError) -> Self {
1235        PayloadAppendError::OperationError(val)
1236    }
1237}
1238
1239impl From<StoreSimpleSledError> for ForgetEntryError<StoreSimpleSledError> {
1240    fn from(value: StoreSimpleSledError) -> Self {
1241        Self::OperationError(value)
1242    }
1243}
1244
1245impl From<StoreSimpleSledError> for ForgetPayloadError<StoreSimpleSledError> {
1246    fn from(value: StoreSimpleSledError) -> Self {
1247        Self::OperationError(value)
1248    }
1249}
1250
1251impl From<StoreSimpleSledError> for PayloadError<StoreSimpleSledError> {
1252    fn from(value: StoreSimpleSledError) -> Self {
1253        Self::OperationError(value)
1254    }
1255}
1256
1257/// Produces bytes of a [payload](https://willowprotocol.org/specs/data-model/index.html#Payload).
1258pub struct PayloadProducer {
1259    produced: usize,
1260    ivec: IVec,
1261}
1262
1263impl PayloadProducer {
1264    fn new(ivec: IVec) -> Self {
1265        Self { produced: 0, ivec }
1266    }
1267}
1268
1269impl Producer for PayloadProducer {
1270    type Item = u8;
1271
1272    type Final = ();
1273
1274    type Error = StoreSimpleSledError;
1275
1276    async fn produce(&mut self) -> Result<Either<Self::Item, Self::Final>, Self::Error> {
1277        match self.produced.cmp(&self.ivec.len()) {
1278            std::cmp::Ordering::Less => {
1279                let byte = self.ivec[self.produced];
1280                Ok(Either::Left(byte))
1281            },
1282            std::cmp::Ordering::Equal =>  Ok(Either::Right(())),
1283            std::cmp::Ordering::Greater => unreachable!("You tried to produce more bytes than you could, but you claimed infallibity. You traitor. You fool."),
1284        }
1285    }
1286}
1287
1288impl BufferedProducer for PayloadProducer {
1289    async fn slurp(&mut self) -> Result<(), Self::Error> {
1290        Ok(())
1291    }
1292}
1293
1294impl BulkProducer for PayloadProducer {
1295    async fn expose_items<'a>(
1296        &'a mut self,
1297    ) -> Result<Either<&'a [Self::Item], Self::Final>, Self::Error>
1298    where
1299        Self::Item: 'a,
1300    {
1301        let slice = &self.ivec[self.produced..];
1302        if slice.is_empty() {
1303            Ok(Either::Right(()))
1304        } else {
1305            Ok(Either::Left(slice))
1306        }
1307    }
1308
1309    async fn consider_produced(&mut self, amount: usize) -> Result<(), Self::Error> {
1310        self.produced += amount;
1311
1312        Ok(())
1313    }
1314}
1315
1316/// Produces [`willow_data_model::LengthyAuthorisedEntry`] for a given [`willow_data_model::grouping::Area`] and [`willow_data_model::QueryIgnoreParams`].
1317pub struct EntryProducer<'store, const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
1318where
1319    N: NamespaceId + EncodableKnownSize + Decodable,
1320    S: SubspaceId,
1321    PD: PayloadDigest,
1322    AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
1323{
1324    iter: sled::Iter,
1325    store: &'store StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>,
1326    ignore: QueryIgnoreParams,
1327    area: Area<MCL, MCC, MPL, S>,
1328}
1329
1330impl<'store, const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
1331    EntryProducer<'store, MCL, MCC, MPL, N, S, PD, AT>
1332where
1333    N: NamespaceId + EncodableKnownSize + DecodableSync,
1334    S: SubspaceId + EncodableKnownSize + EncodableSync,
1335    PD: PayloadDigest,
1336    AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
1337{
1338    async fn new(
1339        store: &'store StoreSimpleSled<MCL, MCC, MPL, N, S, PD, AT>,
1340        area: &Area<MCL, MCC, MPL, S>,
1341        ignore: QueryIgnoreParams,
1342    ) -> Result<Self, StoreSimpleSledError> {
1343        let entry_tree = store.entry_tree()?;
1344
1345        let entry_iterator = match area.subspace() {
1346            AreaSubspace::Any => entry_tree.iter(),
1347            AreaSubspace::Id(subspace) => {
1348                let matching_subspace_path =
1349                    encode_subspace_path_key(subspace, area.path(), false).await;
1350
1351                entry_tree.scan_prefix(&matching_subspace_path)
1352            }
1353        };
1354
1355        Ok(Self {
1356            iter: entry_iterator,
1357            area: area.clone(),
1358            ignore,
1359            store,
1360        })
1361    }
1362}
1363
1364impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT> Producer
1365    for EntryProducer<'_, MCL, MCC, MPL, N, S, PD, AT>
1366where
1367    N: NamespaceId + EncodableKnownSize + Decodable,
1368    S: SubspaceId + Decodable + EncodableKnownSize + EncodableSync,
1369    PD: PayloadDigest + Decodable,
1370    AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + TrustedDecodable,
1371    S::ErrorReason: std::fmt::Debug,
1372    PD::ErrorReason: std::fmt::Debug,
1373{
1374    type Item = LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>;
1375
1376    type Final = ();
1377
1378    type Error = StoreSimpleSledError;
1379
1380    async fn produce(&mut self) -> Result<Either<Self::Item, Self::Final>, Self::Error> {
1381        loop {
1382            let result = self.iter.next();
1383
1384            match result {
1385                Some(Ok((key, value))) => {
1386                    let (subspace, path, timestamp) =
1387                        decode_entry_key::<MCL, MCC, MPL, S>(&key).await;
1388                    let (length, digest, token, local_length) =
1389                        decode_entry_values::<PD, AT>(&value).await;
1390
1391                    let entry = Entry::new(
1392                        self.store.namespace_id.clone(),
1393                        subspace,
1394                        path,
1395                        timestamp,
1396                        length,
1397                        digest,
1398                    );
1399
1400                    if !self.area.includes_entry(&entry) {
1401                        continue;
1402                    }
1403
1404                    let authed_entry = unsafe { AuthorisedEntry::new_unchecked(entry, token) };
1405
1406                    let is_empty_string = length == 0;
1407                    let is_incomplete = local_length < length;
1408
1409                    if (self.ignore.ignore_incomplete_payloads && is_incomplete)
1410                        || (self.ignore.ignore_empty_payloads && is_empty_string)
1411                    {
1412                        continue;
1413                    }
1414
1415                    return Ok(Either::Left(LengthyAuthorisedEntry::new(
1416                        authed_entry,
1417                        local_length,
1418                    )));
1419                }
1420                Some(Err(err)) => return Err(StoreSimpleSledError::from(err)),
1421                None => return Ok(Either::Right(())),
1422            }
1423        }
1424    }
1425}