Skip to main content

iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9};
10
11use anyhow::{anyhow, Result};
12use iroh::{KeyParsingError, PublicKey};
13use iroh_blobs::Hash;
14use n0_future::time::SystemTime;
15use rand::CryptoRng;
16use redb::{Database, ReadableDatabase, ReadableMultimapTable, ReadableTable};
17#[cfg(all(feature = "fs-store", feature = "redb-v2-migration"))]
18use tracing::info;
19use tracing::warn;
20
21use super::{
22    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
23    Query,
24};
25use crate::{
26    actor::MAX_COMMIT_DELAY,
27    keys::Author,
28    ranger::{Fingerprint, Range, RangeEntry},
29    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
30    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
31    ReplicaInfo,
32};
33
34mod bounds;
35#[cfg(all(feature = "fs-store", feature = "redb-v2-migration"))]
36mod migrate_redb_v2_tuples;
37mod migrations;
38mod query;
39mod ranges;
40pub(crate) mod tables;
41
42pub use self::ranges::RecordsRange;
43use self::{
44    bounds::{ByKeyBounds, RecordsBounds},
45    query::QueryIterator,
46    ranges::RangeExt,
47    tables::{
48        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
49        RecordsValue, Tables, TransactionAndTables,
50    },
51};
52
53/// Manages the replicas and authors for an instance.
54#[derive(Debug)]
55pub struct Store {
56    db: Database,
57    transaction: CurrentTransaction,
58    open_replicas: HashSet<NamespaceId>,
59    pubkeys: MemPublicKeyStore,
60}
61
62impl Drop for Store {
63    fn drop(&mut self) {
64        if let Err(err) = self.flush() {
65            warn!("failed to trigger final flush: {:?}", err);
66        }
67    }
68}
69
70impl AsRef<Store> for Store {
71    fn as_ref(&self) -> &Store {
72        self
73    }
74}
75
76impl AsMut<Store> for Store {
77    fn as_mut(&mut self) -> &mut Store {
78        self
79    }
80}
81
82#[derive(derive_more::Debug, Default)]
83#[allow(clippy::large_enum_variant)]
84enum CurrentTransaction {
85    #[default]
86    None,
87    Read(ReadOnlyTables),
88    Write(TransactionAndTables),
89}
90
91#[cfg(feature = "fs-store")]
92fn open_database(path: &std::path::Path) -> Result<Database> {
93    match Database::create(path) {
94        Ok(db) => Ok(db),
95        Err(redb::DatabaseError::UpgradeRequired(v)) => Err(anyhow!(
96            "Opening the database failed: Upgrading from redb {v} no longer supported. Use an older redb version first."
97        )),
98        Err(err) => Err(err.into()),
99    }
100}
101
102#[cfg(feature = "fs-store")]
103fn is_redb_v2_tuple_mismatch(err: &anyhow::Error) -> bool {
104    err.chain().any(|e| {
105        matches!(
106            e.downcast_ref::<redb::TableError>(),
107            Some(redb::TableError::TableTypeMismatch { .. })
108        )
109    })
110}
111
112impl Store {
113    /// Create a new store in memory.
114    pub fn memory() -> Self {
115        Self::memory_impl().expect("failed to create memory store")
116    }
117
118    fn memory_impl() -> Result<Self> {
119        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
120        Self::new_impl(db)
121    }
122
123    /// Create or open a store from a `path` to a database file.
124    ///
125    /// The file will be created if it does not exist, otherwise it will be opened.
126    #[cfg(feature = "fs-store")]
127    pub fn persistent(path: impl AsRef<std::path::Path>) -> Result<Self> {
128        let path = path.as_ref();
129        let db = open_database(path)?;
130        match Self::new_impl(db) {
131            Ok(store) => Ok(store),
132            Err(err) if is_redb_v2_tuple_mismatch(&err) => {
133                #[cfg(feature = "redb-v2-migration")]
134                {
135                    info!("redb 2.x tuple format detected, running migration");
136                    migrate_redb_v2_tuples::run(path)?;
137                    Self::new_impl(open_database(path)?)
138                }
139                #[cfg(not(feature = "redb-v2-migration"))]
140                {
141                    let _ = err;
142                    Err(anyhow!(
143                        "Opening the database failed: this store was written by iroh-docs 0.94..=0.98 (redb 2.x) and needs migration. Enable the `redb-v2-migration` feature on iroh-docs and re-open to migrate."
144                    ))
145                }
146            }
147            Err(err) => Err(err),
148        }
149    }
150
151    fn new_impl(db: redb::Database) -> Result<Self> {
152        // Setup all tables
153        let write_tx = db.begin_write()?;
154        let _ = Tables::new(&write_tx)?;
155        write_tx.commit()?;
156
157        // Run database migrations
158        migrations::run_migrations(&db)?;
159
160        Ok(Store {
161            db,
162            transaction: Default::default(),
163            open_replicas: Default::default(),
164            pubkeys: Default::default(),
165        })
166    }
167
168    /// Flush the current transaction, if any.
169    ///
170    /// This is the cheapest way to ensure that the data is persisted.
171    pub fn flush(&mut self) -> Result<()> {
172        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
173            w.commit()?;
174        }
175        Ok(())
176    }
177
178    /// Get a read-only snapshot of the database.
179    ///
180    /// This has the side effect of committing any open write transaction,
181    /// so it can be used as a way to ensure that the data is persisted.
182    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
183        let guard = &mut self.transaction;
184        let tables = match std::mem::take(guard) {
185            CurrentTransaction::None => {
186                let tx = self.db.begin_read()?;
187                ReadOnlyTables::new(tx)?
188            }
189            CurrentTransaction::Write(w) => {
190                w.commit()?;
191                let tx = self.db.begin_read()?;
192                ReadOnlyTables::new(tx)?
193            }
194            CurrentTransaction::Read(tables) => tables,
195        };
196        *guard = CurrentTransaction::Read(tables);
197        match &*guard {
198            CurrentTransaction::Read(ref tables) => Ok(tables),
199            _ => unreachable!(),
200        }
201    }
202
203    /// Get an owned read-only snapshot of the database.
204    ///
205    /// This will open a new read transaction. The read transaction won't be reused for other
206    /// reads.
207    ///
208    /// This has the side effect of committing any open write transaction,
209    /// so it can be used as a way to ensure that the data is persisted.
210    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
211        // make sure the current transaction is committed
212        self.flush()?;
213        assert!(matches!(self.transaction, CurrentTransaction::None));
214        let tx = self.db.begin_read()?;
215        let tables = ReadOnlyTables::new(tx)?;
216        Ok(tables)
217    }
218
219    /// Get access to the tables to read from them.
220    ///
221    /// The underlying transaction is a write transaction, but with a non-mut
222    /// reference to the tables you can not write.
223    ///
224    /// There is no guarantee that this will be an independent transaction.
225    /// You just get readonly access to the current state of the database.
226    ///
227    /// As such, there is also no guarantee that the data you see is
228    /// already persisted.
229    fn tables(&mut self) -> Result<&Tables<'_>> {
230        let guard = &mut self.transaction;
231        let tables = match std::mem::take(guard) {
232            CurrentTransaction::None => {
233                let tx = self.db.begin_write()?;
234                TransactionAndTables::new(tx)?
235            }
236            CurrentTransaction::Write(w) => {
237                if w.since.elapsed() > MAX_COMMIT_DELAY {
238                    tracing::debug!("committing transaction because it's too old");
239                    w.commit()?;
240                    let tx = self.db.begin_write()?;
241                    TransactionAndTables::new(tx)?
242                } else {
243                    w
244                }
245            }
246            CurrentTransaction::Read(_) => {
247                let tx = self.db.begin_write()?;
248                TransactionAndTables::new(tx)?
249            }
250        };
251        *guard = CurrentTransaction::Write(tables);
252        match guard {
253            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
254            _ => unreachable!(),
255        }
256    }
257
258    /// Get exclusive write access to the tables in the current transaction.
259    ///
260    /// There is no guarantee that this will be an independent transaction.
261    /// As such, there is also no guarantee that the data you see or write
262    /// will be persisted.
263    ///
264    /// To ensure that the data is persisted, acquire a snapshot of the database
265    /// or call flush.
266    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
267        let guard = &mut self.transaction;
268        let tables = match std::mem::take(guard) {
269            CurrentTransaction::None => {
270                let tx = self.db.begin_write()?;
271                TransactionAndTables::new(tx)?
272            }
273            CurrentTransaction::Write(w) => {
274                if w.since.elapsed() > MAX_COMMIT_DELAY {
275                    tracing::debug!("committing transaction because it's too old");
276                    w.commit()?;
277                    let tx = self.db.begin_write()?;
278                    TransactionAndTables::new(tx)?
279                } else {
280                    w
281                }
282            }
283            CurrentTransaction::Read(_) => {
284                let tx = self.db.begin_write()?;
285                TransactionAndTables::new(tx)?
286            }
287        };
288        *guard = CurrentTransaction::Write(tables);
289        let res = match &mut *guard {
290            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
291            _ => unreachable!(),
292        };
293        Ok(res)
294    }
295}
296
297type PeersIter = std::vec::IntoIter<PeerIdBytes>;
298
299impl Store {
300    /// Create a new replica for `namespace` and persist in this store.
301    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
302        let id = namespace.id();
303        self.import_namespace(namespace.into())?;
304        self.open_replica(&id).map_err(Into::into)
305    }
306
307    /// Create a new author key and persist it in the store.
308    pub fn new_author<R: CryptoRng>(&mut self, rng: &mut R) -> Result<Author> {
309        let author = Author::new(rng);
310        self.import_author(author.clone())?;
311        Ok(author)
312    }
313
314    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
315    ///
316    /// Returns the number of authors that the other peer has updates for.
317    pub fn has_news_for_us(
318        &mut self,
319        namespace: NamespaceId,
320        heads: &AuthorHeads,
321    ) -> Result<Option<NonZeroU64>> {
322        let our_heads = {
323            let latest = self.get_latest_for_each_author(namespace)?;
324            let mut heads = AuthorHeads::default();
325            for e in latest {
326                let (author, timestamp, _key) = e?;
327                heads.insert(author, timestamp);
328            }
329            heads
330        };
331        let has_news_for_us = heads.has_news_for(&our_heads);
332        Ok(has_news_for_us)
333    }
334
335    /// Open a replica from this store.
336    ///
337    /// This just calls load_replica_info and then creates a new replica with the info.
338    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
339        let info = self.load_replica_info(namespace_id)?;
340        let instance = StoreInstance::new(*namespace_id, self);
341        Ok(Replica::new(instance, Box::new(info)))
342    }
343
344    /// Load the replica info from the store.
345    pub fn load_replica_info(
346        &mut self,
347        namespace_id: &NamespaceId,
348    ) -> Result<ReplicaInfo, OpenError> {
349        let tables = self.tables()?;
350        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
351            Ok(Some(db_value)) => {
352                let (raw_kind, raw_bytes) = db_value.value();
353                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
354                ReplicaInfo::new(namespace)
355            }
356            Ok(None) => return Err(OpenError::NotFound),
357            Err(err) => return Err(OpenError::Other(err.into())),
358        };
359        self.open_replicas.insert(info.capability.id());
360        Ok(info)
361    }
362
363    /// Close a replica.
364    pub fn close_replica(&mut self, id: NamespaceId) {
365        self.open_replicas.remove(&id);
366    }
367
368    /// List all replica namespaces in this store.
369    pub fn list_namespaces(
370        &mut self,
371    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
372        let snapshot = self.snapshot()?;
373        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
374        let iter = iter.map(|res| {
375            let capability = parse_capability(res?.1.value())?;
376            Ok((capability.id(), capability.kind()))
377        });
378        Ok(iter)
379    }
380
381    /// Get an author key from the store.
382    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
383        let tables = self.tables()?;
384        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
385            return Ok(None);
386        };
387        let author = Author::from_bytes(author.value());
388        Ok(Some(author))
389    }
390
391    /// Import an author key pair.
392    pub fn import_author(&mut self, author: Author) -> Result<()> {
393        self.modify(|tables| {
394            tables
395                .authors
396                .insert(author.id().as_bytes(), &author.to_bytes())?;
397            Ok(())
398        })
399    }
400
401    /// Delete an author.
402    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
403        self.modify(|tables| {
404            tables.authors.remove(author.as_bytes())?;
405            Ok(())
406        })
407    }
408
409    /// List all author keys in this store.
410    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
411        let tables = self.snapshot()?;
412        let iter = tables
413            .authors
414            .range::<&'static [u8; 32]>(..)?
415            .map(|res| match res {
416                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
417                Err(err) => Err(err.into()),
418            });
419        Ok(iter)
420    }
421
422    /// Import a new replica namespace.
423    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
424        self.modify(|tables| {
425            let outcome = {
426                let (capability, outcome) = {
427                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
428                    if let Some(existing) = existing {
429                        let mut existing = parse_capability(existing.value())?;
430                        let outcome = if existing.merge(capability)? {
431                            ImportNamespaceOutcome::Upgraded
432                        } else {
433                            ImportNamespaceOutcome::NoChange
434                        };
435                        (existing, outcome)
436                    } else {
437                        (capability, ImportNamespaceOutcome::Inserted)
438                    }
439                };
440                let id = capability.id().to_bytes();
441                let (kind, bytes) = capability.raw();
442                tables.namespaces.insert(&id, (kind, &bytes))?;
443                outcome
444            };
445            Ok(outcome)
446        })
447    }
448
449    /// Remove a replica.
450    ///
451    /// Completely removes a replica and deletes both the namespace private key and all document
452    /// entries.
453    ///
454    /// Note that a replica has to be closed before it can be removed. The store has to enforce
455    /// that a replica cannot be removed while it is still open.
456    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
457        if self.open_replicas.contains(namespace) {
458            return Err(anyhow!("replica is not closed"));
459        }
460        self.modify(|tables| {
461            let bounds = RecordsBounds::namespace(*namespace);
462            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
463            let bounds = ByKeyBounds::namespace(*namespace);
464            let _ = tables
465                .records_by_key
466                .retain_in(bounds.as_ref(), |_k, _v| false);
467            tables.namespaces.remove(namespace.as_bytes())?;
468            tables.namespace_peers.remove_all(namespace.as_bytes())?;
469            tables.download_policy.remove(namespace.as_bytes())?;
470            Ok(())
471        })
472    }
473
474    /// Get an iterator over entries of a replica.
475    pub fn get_many(
476        &mut self,
477        namespace: NamespaceId,
478        query: impl Into<Query>,
479    ) -> Result<QueryIterator> {
480        let tables = self.snapshot_owned()?;
481        QueryIterator::new(tables, namespace, query.into())
482    }
483
484    /// Get an entry by key and author.
485    pub fn get_exact(
486        &mut self,
487        namespace: NamespaceId,
488        author: AuthorId,
489        key: impl AsRef<[u8]>,
490        include_empty: bool,
491    ) -> Result<Option<SignedEntry>> {
492        get_exact(
493            &self.tables()?.records,
494            namespace,
495            author,
496            key,
497            include_empty,
498        )
499    }
500
501    /// Get all content hashes of all replicas in the store.
502    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
503        let tables = self.snapshot_owned()?;
504        ContentHashesIterator::all(&tables.records)
505    }
506
507    /// Get the latest entry for each author in a namespace.
508    pub fn get_latest_for_each_author(
509        &mut self,
510        namespace: NamespaceId,
511    ) -> Result<LatestIterator<'_>> {
512        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
513    }
514
515    /// Register a peer that has been useful to sync a document.
516    pub fn register_useful_peer(
517        &mut self,
518        namespace: NamespaceId,
519        peer: crate::PeerIdBytes,
520    ) -> Result<()> {
521        let peer = &peer;
522        let namespace = namespace.as_bytes();
523        // calculate nanos since UNIX_EPOCH for a time measurement
524        let nanos = SystemTime::UNIX_EPOCH
525            .elapsed()
526            .map(|duration| duration.as_nanos() as u64)?;
527        self.modify(|tables| {
528            // ensure the document exists
529            anyhow::ensure!(
530                tables.namespaces.get(namespace)?.is_some(),
531                "document not created"
532            );
533
534            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
535
536            // get the oldest entry since it's candidate for removal
537            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
538                let (oldest_nanos, &oldest_peer) = guard.value();
539                (oldest_nanos, oldest_peer)
540            });
541            match maybe_oldest {
542                None => {
543                    // the table is empty so the peer can be inserted without further checks since
544                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
545                    drop(namespace_peers);
546                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
547                }
548                Some((oldest_nanos, oldest_peer)) => {
549                    let oldest_peer = &oldest_peer;
550
551                    if oldest_peer == peer {
552                        // oldest peer is the current one, so replacing the entry for the peer will
553                        // maintain the size
554                        drop(namespace_peers);
555                        tables
556                            .namespace_peers
557                            .remove(namespace, (oldest_nanos, oldest_peer))?;
558                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
559                    } else {
560                        // calculate the len in the same loop since calling `len` is another fallible operation
561                        let mut len = 1;
562                        // find any previous entry for the same peer to remove it
563                        let mut prev_peer_nanos = None;
564
565                        for result in namespace_peers {
566                            len += 1;
567                            let guard = result?;
568                            let (peer_nanos, peer_bytes) = guard.value();
569                            if prev_peer_nanos.is_none() && peer_bytes == peer {
570                                prev_peer_nanos = Some(peer_nanos)
571                            }
572                        }
573
574                        match prev_peer_nanos {
575                            Some(prev_nanos) => {
576                                // the peer was already present, so we can remove the old entry and
577                                // insert the new one without checking the size
578                                tables
579                                    .namespace_peers
580                                    .remove(namespace, (prev_nanos, peer))?;
581                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
582                            }
583                            None => {
584                                // the peer is new and the table is non empty, add it and check the
585                                // size to decide if the oldest peer should be evicted
586                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
587                                len += 1;
588                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
589                                    tables
590                                        .namespace_peers
591                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
592                                }
593                            }
594                        }
595                    }
596                }
597            }
598            Ok(())
599        })
600    }
601
602    /// Get the peers that have been useful for a document.
603    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
604        let tables = self.tables()?;
605        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
606        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
607            let (_nanos, &peer) = result?.value();
608            peers.push(peer);
609        }
610        if peers.is_empty() {
611            Ok(None)
612        } else {
613            Ok(Some(peers.into_iter()))
614        }
615    }
616
617    /// Set the download policy for a namespace.
618    pub fn set_download_policy(
619        &mut self,
620        namespace: &NamespaceId,
621        policy: DownloadPolicy,
622    ) -> Result<()> {
623        self.modify(|tables| {
624            let namespace = namespace.as_bytes();
625
626            // ensure the document exists
627            anyhow::ensure!(
628                tables.namespaces.get(&namespace)?.is_some(),
629                "document not created"
630            );
631
632            let value = postcard::to_stdvec(&policy)?;
633            tables.download_policy.insert(namespace, value.as_slice())?;
634            Ok(())
635        })
636    }
637
638    /// Get the download policy for a namespace.
639    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
640        let tables = self.tables()?;
641        let value = tables.download_policy.get(namespace.as_bytes())?;
642        Ok(match value {
643            None => DownloadPolicy::default(),
644            Some(value) => postcard::from_bytes(value.value())?,
645        })
646    }
647}
648
649impl PublicKeyStore for Store {
650    fn public_key(&self, id: &[u8; 32]) -> Result<PublicKey, KeyParsingError> {
651        self.pubkeys.public_key(id)
652    }
653}
654
655fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
656    Capability::from_raw(raw_kind, raw_bytes)
657}
658
659fn get_exact(
660    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
661    namespace: NamespaceId,
662    author: AuthorId,
663    key: impl AsRef<[u8]>,
664    include_empty: bool,
665) -> Result<Option<SignedEntry>> {
666    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
667    let record = record_table.get(id)?;
668    Ok(record
669        .map(|r| into_entry(id, r.value()))
670        .filter(|entry| include_empty || !entry.is_empty()))
671}
672
673/// A wrapper around [`Store`] for a specific [`NamespaceId`]
674#[derive(Debug)]
675pub struct StoreInstance<'a> {
676    namespace: NamespaceId,
677    pub(crate) store: &'a mut Store,
678}
679
680impl<'a> StoreInstance<'a> {
681    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
682        StoreInstance { namespace, store }
683    }
684}
685
686impl PublicKeyStore for StoreInstance<'_> {
687    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<PublicKey, KeyParsingError> {
688        self.store.public_key(id)
689    }
690}
691
692impl super::DownloadPolicyStore for StoreInstance<'_> {
693    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
694        self.store.get_download_policy(namespace)
695    }
696}
697
698impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
699    type Error = anyhow::Error;
700    type RangeIterator<'x>
701        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
702    where
703        'a: 'x;
704    type ParentIterator<'x>
705        = ParentIterator
706    where
707        'a: 'x;
708
709    /// Get a the first key (or the default if none is available).
710    fn get_first(&mut self) -> Result<RecordIdentifier> {
711        let tables = self.store.as_mut().tables()?;
712        // TODO: verify this fetches all keys with this namespace
713        let bounds = RecordsBounds::namespace(self.namespace);
714        let mut records = tables.records.range(bounds.as_ref())?;
715
716        let Some(record) = records.next() else {
717            return Ok(RecordIdentifier::default());
718        };
719        let (compound_key, _value) = record?;
720        let (namespace_id, author_id, key) = compound_key.value();
721        let id = RecordIdentifier::new(namespace_id, author_id, key);
722        Ok(id)
723    }
724
725    #[cfg(test)]
726    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
727        self.store
728            .as_mut()
729            .get_exact(id.namespace(), id.author(), id.key(), true)
730    }
731
732    #[cfg(test)]
733    fn len(&mut self) -> Result<usize> {
734        let tables = self.store.as_mut().tables()?;
735        let bounds = RecordsBounds::namespace(self.namespace);
736        let records = tables.records.range(bounds.as_ref())?;
737        Ok(records.count())
738    }
739
740    #[cfg(test)]
741    fn is_empty(&mut self) -> Result<bool> {
742        use redb::ReadableTableMetadata;
743        let tables = self.store.as_mut().tables()?;
744        Ok(tables.records.is_empty()?)
745    }
746
747    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
748        // TODO: optimize
749        let elements = self.get_range(range.clone())?;
750
751        let mut fp = Fingerprint::empty();
752        for el in elements {
753            let el = el?;
754            fp ^= el.as_fingerprint();
755        }
756
757        Ok(fp)
758    }
759
760    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
761        let id = e.id();
762        self.store.as_mut().modify(|tables| {
763            // insert into record table
764            let key = (
765                &id.namespace().to_bytes(),
766                &id.author().to_bytes(),
767                id.key(),
768            );
769            let hash = e.content_hash(); // let binding is needed
770            let value = (
771                e.timestamp(),
772                &e.signature().namespace().to_bytes(),
773                &e.signature().author().to_bytes(),
774                e.content_len(),
775                hash.as_bytes(),
776            );
777            tables.records.insert(key, value)?;
778
779            // insert into by key index table
780            let key = (
781                &id.namespace().to_bytes(),
782                id.key(),
783                &id.author().to_bytes(),
784            );
785            tables.records_by_key.insert(key, ())?;
786
787            // insert into latest table
788            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
789            let value = (e.timestamp(), e.id().key());
790            tables.latest_per_author.insert(key, value)?;
791            Ok(())
792        })
793    }
794
795    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
796        let tables = self.store.as_mut().tables()?;
797        let iter = match range.x().cmp(range.y()) {
798            // identity range: iter1 = all, iter2 = none
799            Ordering::Equal => {
800                // iterator for all entries in replica
801                let bounds = RecordsBounds::namespace(self.namespace);
802                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
803                chain_none(iter)
804            }
805            // regular range: iter1 = x <= t < y, iter2 = none
806            Ordering::Less => {
807                // iterator for entries from range.x to range.y
808                let start = Bound::Included(range.x().to_byte_tuple());
809                let end = Bound::Excluded(range.y().to_byte_tuple());
810                let bounds = RecordsBounds::new(start, end);
811                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
812                chain_none(iter)
813            }
814            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
815            Ordering::Greater => {
816                // iterator for entries from start to range.y
817                let end = Bound::Excluded(range.y().to_byte_tuple());
818                let bounds = RecordsBounds::from_start(&self.namespace, end);
819                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
820
821                // iterator for entries from range.x to end
822                let start = Bound::Included(range.x().to_byte_tuple());
823                let bounds = RecordsBounds::to_end(&self.namespace, start);
824                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
825
826                iter.chain(Some(iter2).into_iter().flatten())
827            }
828        };
829        Ok(iter)
830    }
831
832    #[cfg(test)]
833    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
834        self.store.as_mut().modify(|tables| {
835            let entry = {
836                let (namespace, author, key) = id.as_byte_tuple();
837                let id = (namespace, key, author);
838                tables.records_by_key.remove(id)?;
839                let id = (namespace, author, key);
840                let value = tables.records.remove(id)?;
841                value.map(|value| into_entry(id, value.value()))
842            };
843            Ok(entry)
844        })
845    }
846
847    #[cfg(test)]
848    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
849        let tables = self.store.as_mut().tables()?;
850        let bounds = RecordsBounds::namespace(self.namespace);
851        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
852        Ok(chain_none(iter))
853    }
854
855    fn prefixes_of(
856        &mut self,
857        id: &RecordIdentifier,
858    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
859        let tables = self.store.as_mut().tables()?;
860        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
861    }
862
863    #[cfg(test)]
864    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
865        let tables = self.store.as_mut().tables()?;
866        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
867        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
868        Ok(chain_none(iter))
869    }
870
871    fn remove_prefix_filtered(
872        &mut self,
873        id: &RecordIdentifier,
874        predicate: impl Fn(&Record) -> bool,
875    ) -> Result<usize> {
876        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
877        self.store.as_mut().modify(|tables| {
878            let cb = |_k: RecordsId, v: RecordsValue| {
879                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
880                let record = Record::new(hash.into(), len, timestamp);
881
882                predicate(&record)
883            };
884            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
885            let count = iter.count();
886            Ok(count)
887        })
888    }
889}
890
891fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
892    iter: I,
893) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
894    iter.chain(None.into_iter().flatten())
895}
896
897/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
898/// is a prefix of the key passed to the iterator.
899#[derive(Debug)]
900pub struct ParentIterator {
901    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
902}
903
904impl ParentIterator {
905    fn new(
906        tables: &Tables,
907        namespace: NamespaceId,
908        author: AuthorId,
909        key: Vec<u8>,
910    ) -> anyhow::Result<Self> {
911        let parents = parents(&tables.records, namespace, author, key.clone());
912        Ok(Self {
913            inner: parents.into_iter(),
914        })
915    }
916}
917
918fn parents(
919    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
920    namespace: NamespaceId,
921    author: AuthorId,
922    mut key: Vec<u8>,
923) -> Vec<anyhow::Result<SignedEntry>> {
924    let mut res = Vec::new();
925
926    while !key.is_empty() {
927        let entry = get_exact(table, namespace, author, &key, false);
928        key.pop();
929        match entry {
930            Err(err) => res.push(Err(err)),
931            Ok(Some(entry)) => res.push(Ok(entry)),
932            Ok(None) => continue,
933        }
934    }
935    res.reverse();
936    res
937}
938
939impl Iterator for ParentIterator {
940    type Item = Result<SignedEntry>;
941
942    fn next(&mut self) -> Option<Self::Item> {
943        self.inner.next()
944    }
945}
946
947/// Iterator for all content hashes
948///
949/// Note that you might get duplicate hashes. Also, the iterator will keep
950/// a database snapshot open until it is dropped.
951///
952/// Also, this represents a snapshot of the database at the time of creation.
953/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
954#[derive(derive_more::Debug)]
955pub struct ContentHashesIterator {
956    #[debug(skip)]
957    range: RecordsRange<'static>,
958}
959
960impl ContentHashesIterator {
961    /// Create a new iterator over all content hashes.
962    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
963        let range = RecordsRange::all_static(table)?;
964        Ok(Self { range })
965    }
966}
967
968impl Iterator for ContentHashesIterator {
969    type Item = Result<Hash>;
970
971    fn next(&mut self) -> Option<Self::Item> {
972        let v = self.range.next()?;
973        Some(v.map(|e| e.content_hash()))
974    }
975}
976
977/// Iterator over the latest entry per author.
978#[derive(derive_more::Debug)]
979#[debug("LatestIterator")]
980pub struct LatestIterator<'a>(
981    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
982);
983
984impl<'a> LatestIterator<'a> {
985    fn new(
986        latest_per_author: &'a impl ReadableTable<
987            LatestPerAuthorKey<'static>,
988            LatestPerAuthorValue<'static>,
989        >,
990        namespace: NamespaceId,
991    ) -> anyhow::Result<Self> {
992        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
993        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
994        let range = latest_per_author.range(start..=end)?;
995        Ok(Self(range))
996    }
997}
998
999impl Iterator for LatestIterator<'_> {
1000    type Item = Result<(AuthorId, u64, Vec<u8>)>;
1001
1002    fn next(&mut self) -> Option<Self::Item> {
1003        self.0.next_map(|key, value| {
1004            let (_namespace, author) = key;
1005            let (timestamp, key) = value;
1006            (author.into(), timestamp, key.to_vec())
1007        })
1008    }
1009}
1010
1011fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
1012    let (namespace, author, key) = key;
1013    let (timestamp, namespace_sig, author_sig, len, hash) = value;
1014    let id = RecordIdentifier::new(namespace, author, key);
1015    let record = Record::new(hash.into(), len, timestamp);
1016    let entry = Entry::new(id, record);
1017    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
1018    SignedEntry::new(entry_signature, entry)
1019}
1020
1021#[cfg(all(test, feature = "fs-store"))]
1022mod tests {
1023    use super::*;
1024    use crate::ranger::Store as _;
1025
1026    #[tokio::test]
1027    async fn test_ranges() -> Result<()> {
1028        let dbfile = tempfile::NamedTempFile::new()?;
1029        let mut store = Store::persistent(dbfile.path())?;
1030
1031        let author = store.new_author(&mut rand::rng())?;
1032        let namespace = NamespaceSecret::new(&mut rand::rng());
1033        let mut replica = store.new_replica(namespace.clone())?;
1034
1035        // test author prefix relation for all-255 keys
1036        let key1 = vec![255, 255];
1037        let key2 = vec![255, 255, 255];
1038        replica.hash_and_insert(&key1, &author, b"v1").await?;
1039        replica.hash_and_insert(&key2, &author, b"v2").await?;
1040        let res = store
1041            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1042            .collect::<Result<Vec<_>>>()?;
1043        assert_eq!(res.len(), 2);
1044        assert_eq!(
1045            res.into_iter()
1046                .map(|entry| entry.key().to_vec())
1047                .collect::<Vec<_>>(),
1048            vec![key1, key2]
1049        );
1050        Ok(())
1051    }
1052
1053    #[test]
1054    fn test_basics() -> Result<()> {
1055        let dbfile = tempfile::NamedTempFile::new()?;
1056        let mut store = Store::persistent(dbfile.path())?;
1057
1058        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1059        assert!(authors.is_empty());
1060
1061        let author = store.new_author(&mut rand::rng())?;
1062        let namespace = NamespaceSecret::new(&mut rand::rng());
1063        let _replica = store.new_replica(namespace.clone())?;
1064        store.close_replica(namespace.id());
1065        let replica = store.load_replica_info(&namespace.id())?;
1066        assert_eq!(replica.capability.id(), namespace.id());
1067
1068        let author_back = store.get_author(&author.id())?.unwrap();
1069        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1070
1071        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1072        for i in 0..5 {
1073            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1074            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1075            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1076            wrapper.entry_put(entry)?;
1077        }
1078
1079        // all
1080        let all: Vec<_> = wrapper.all()?.collect();
1081        assert_eq!(all.len(), 5);
1082
1083        // add a second version
1084        let mut ids = Vec::new();
1085        for i in 0..5 {
1086            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1087            let entry = Entry::new(
1088                id.clone(),
1089                Record::current_from_data(format!("world-{i}-2")),
1090            );
1091            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1092            wrapper.entry_put(entry)?;
1093            ids.push(id);
1094        }
1095
1096        // get all
1097        let entries = wrapper
1098            .store
1099            .get_many(namespace.id(), Query::all())?
1100            .collect::<Result<Vec<_>>>()?;
1101        assert_eq!(entries.len(), 5);
1102
1103        // get all prefix
1104        let entries = wrapper
1105            .store
1106            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1107            .collect::<Result<Vec<_>>>()?;
1108        assert_eq!(entries.len(), 5);
1109
1110        // delete and get
1111        for id in ids {
1112            let res = wrapper.get(&id)?;
1113            assert!(res.is_some());
1114            let out = wrapper.entry_remove(&id)?.unwrap();
1115            assert_eq!(out.entry().id(), &id);
1116            let res = wrapper.get(&id)?;
1117            assert!(res.is_none());
1118        }
1119
1120        // get latest
1121        let entries = wrapper
1122            .store
1123            .get_many(namespace.id(), Query::all())?
1124            .collect::<Result<Vec<_>>>()?;
1125        assert_eq!(entries.len(), 0);
1126
1127        Ok(())
1128    }
1129
1130    fn copy_and_modify(
1131        source: &std::path::Path,
1132        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1133    ) -> Result<tempfile::NamedTempFile> {
1134        let dbfile = tempfile::NamedTempFile::new()?;
1135        std::fs::copy(source, dbfile.path())?;
1136        let db = Database::create(dbfile.path())?;
1137        let write_tx = db.begin_write()?;
1138        modify(&write_tx)?;
1139        write_tx.commit()?;
1140        drop(db);
1141        Ok(dbfile)
1142    }
1143
1144    #[tokio::test]
1145    #[cfg(feature = "fs-store")]
1146    async fn test_migration_001_populate_latest_table() -> Result<()> {
1147        use super::tables::LATEST_PER_AUTHOR_TABLE;
1148        let dbfile = tempfile::NamedTempFile::new()?;
1149        let namespace = NamespaceSecret::new(&mut rand::rng());
1150
1151        // create a store and add some data
1152        let expected = {
1153            let mut store = Store::persistent(dbfile.path())?;
1154            let author1 = store.new_author(&mut rand::rng())?;
1155            let author2 = store.new_author(&mut rand::rng())?;
1156            let mut replica = store.new_replica(namespace.clone())?;
1157            replica.hash_and_insert(b"k1", &author1, b"v1").await?;
1158            replica.hash_and_insert(b"k2", &author2, b"v1").await?;
1159            replica.hash_and_insert(b"k3", &author1, b"v1").await?;
1160
1161            let expected = store
1162                .get_latest_for_each_author(namespace.id())?
1163                .collect::<Result<Vec<_>>>()?;
1164            // drop everything to clear file locks.
1165            store.close_replica(namespace.id());
1166            // flush the store to disk
1167            store.flush()?;
1168            drop(store);
1169            expected
1170        };
1171        assert_eq!(expected.len(), 2);
1172
1173        // create a copy of our db file with the latest table deleted.
1174        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1175            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1176            Ok(())
1177        })?;
1178
1179        // open the copied db file, which will run the migration.
1180        let mut store = Store::persistent(dbfile_before_migration.path())?;
1181        let actual = store
1182            .get_latest_for_each_author(namespace.id())?
1183            .collect::<Result<Vec<_>>>()?;
1184
1185        assert_eq!(expected, actual);
1186        Ok(())
1187    }
1188
1189    #[test]
1190    #[cfg(feature = "fs-store")]
1191    fn test_migration_004_populate_by_key_index() -> Result<()> {
1192        use redb::ReadableTableMetadata;
1193        let dbfile = tempfile::NamedTempFile::new()?;
1194
1195        let mut store = Store::persistent(dbfile.path())?;
1196
1197        // check that the new table is there, even if empty
1198        {
1199            let tables = store.tables()?;
1200            assert_eq!(tables.records_by_key.len()?, 0);
1201        }
1202
1203        // TODO: write test checking that the indexing is done correctly
1204        Ok(())
1205    }
1206
1207    #[test]
1208    #[cfg(all(feature = "fs-store", feature = "redb-v2-migration"))]
1209    fn test_migration_redb_v2_tuples() -> Result<()> {
1210        use migrate_redb_v2_tuples::old;
1211
1212        let dbfile = tempfile::NamedTempFile::new()?;
1213        let path = dbfile.path().to_path_buf();
1214
1215        let ns = [1u8; 32];
1216        let author = [2u8; 32];
1217        let key: &[u8] = b"hello";
1218        let ns_sig = [3u8; 64];
1219        let auth_sig = [4u8; 64];
1220        let hash = [5u8; 32];
1221
1222        {
1223            let db = redb_v3::Database::create(&path)?;
1224            let tx = db.begin_write()?;
1225            {
1226                let mut records = tx.open_table(old::RECORDS_TABLE)?;
1227                let mut latest = tx.open_table(old::LATEST_PER_AUTHOR_TABLE)?;
1228                let mut by_key = tx.open_table(old::RECORDS_BY_KEY_TABLE)?;
1229                records.insert(
1230                    (&ns, &author, key),
1231                    (42u64, &ns_sig, &auth_sig, 7u64, &hash),
1232                )?;
1233                latest.insert((&ns, &author), (42u64, key))?;
1234                by_key.insert((&ns, key, &author), ())?;
1235            }
1236            tx.commit()?;
1237        }
1238
1239        // Confirm redb 4 rejects the file before migration.
1240        {
1241            let db = redb::Database::create(&path)?;
1242            let tx = db.begin_write()?;
1243            let err = Tables::new(&tx).unwrap_err();
1244            assert!(
1245                matches!(err, redb::TableError::TableTypeMismatch { .. }),
1246                "expected TableTypeMismatch, got {err:?}",
1247            );
1248        }
1249
1250        let store = Store::persistent(&path)?;
1251        drop(store);
1252
1253        let backup: std::path::PathBuf = {
1254            let mut p = path.clone().into_os_string();
1255            p.push(".backup-redb-v2-tuples");
1256            p.into()
1257        };
1258        assert!(
1259            backup.exists(),
1260            "missing backup file at {}",
1261            backup.display()
1262        );
1263
1264        // After migration redb 4 can open the affected tables and the rows survive.
1265        {
1266            let db = redb::Database::create(&path)?;
1267            let tx = db.begin_read()?;
1268            let records = tx.open_table(tables::RECORDS_TABLE)?;
1269            let entries: Vec<_> = records.iter()?.collect::<std::result::Result<_, _>>()?;
1270            assert_eq!(entries.len(), 1);
1271            let (k, v) = &entries[0];
1272            assert_eq!(k.value(), (&ns, &author, key));
1273            assert_eq!(v.value(), (42u64, &ns_sig, &auth_sig, 7u64, &hash));
1274
1275            let latest = tx.open_table(tables::LATEST_PER_AUTHOR_TABLE)?;
1276            let entries: Vec<_> = latest.iter()?.collect::<std::result::Result<_, _>>()?;
1277            assert_eq!(entries.len(), 1);
1278            assert_eq!(entries[0].0.value(), (&ns, &author));
1279            assert_eq!(entries[0].1.value(), (42u64, key));
1280
1281            let by_key = tx.open_table(tables::RECORDS_BY_KEY_TABLE)?;
1282            let entries: Vec<_> = by_key.iter()?.collect::<std::result::Result<_, _>>()?;
1283            assert_eq!(entries.len(), 1);
1284            assert_eq!(entries[0].0.value(), (&ns, key, &author));
1285        }
1286
1287        Ok(())
1288    }
1289
1290    #[test]
1291    #[cfg(feature = "fs-store")]
1292    fn test_no_migration_on_fresh_store() -> Result<()> {
1293        let dbfile = tempfile::NamedTempFile::new()?;
1294        let path = dbfile.path().to_path_buf();
1295        let store = Store::persistent(&path)?;
1296        drop(store);
1297
1298        let backup: std::path::PathBuf = {
1299            let mut p = path.clone().into_os_string();
1300            p.push(".backup-redb-v2-tuples");
1301            p.into()
1302        };
1303        assert!(
1304            !backup.exists(),
1305            "unexpected backup file at {}",
1306            backup.display()
1307        );
1308
1309        let _store = Store::persistent(&path)?;
1310        assert!(!backup.exists());
1311        Ok(())
1312    }
1313}