iroh_docs/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9    path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
17use tracing::warn;
18
19use super::{
20    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21    Query,
22};
23use crate::{
24    actor::MAX_COMMIT_DELAY,
25    keys::Author,
26    ranger::{Fingerprint, Range, RangeEntry},
27    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29    ReplicaInfo,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39pub use self::ranges::RecordsRange;
40use self::{
41    bounds::{ByKeyBounds, RecordsBounds},
42    query::QueryIterator,
43    ranges::RangeExt,
44    tables::{
45        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
46        RecordsValue, Tables, TransactionAndTables,
47    },
48};
49
50/// Manages the replicas and authors for an instance.
51#[derive(Debug)]
52pub struct Store {
53    db: Database,
54    transaction: CurrentTransaction,
55    open_replicas: HashSet<NamespaceId>,
56    pubkeys: MemPublicKeyStore,
57}
58
59impl Drop for Store {
60    fn drop(&mut self) {
61        if let Err(err) = self.flush() {
62            warn!("failed to trigger final flush: {:?}", err);
63        }
64    }
65}
66
67impl AsRef<Store> for Store {
68    fn as_ref(&self) -> &Store {
69        self
70    }
71}
72
73impl AsMut<Store> for Store {
74    fn as_mut(&mut self) -> &mut Store {
75        self
76    }
77}
78
79#[derive(derive_more::Debug, Default)]
80enum CurrentTransaction {
81    #[default]
82    None,
83    Read(ReadOnlyTables),
84    Write(TransactionAndTables),
85}
86
87impl Store {
88    /// Create a new store in memory.
89    pub fn memory() -> Self {
90        Self::memory_impl().expect("failed to create memory store")
91    }
92
93    fn memory_impl() -> Result<Self> {
94        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95        Self::new_impl(db)
96    }
97
98    /// Create or open a store from a `path` to a database file.
99    ///
100    /// The file will be created if it does not exist, otherwise it will be opened.
101    pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
102        let db = match Database::create(&path) {
103            Ok(db) => db,
104            Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
105            Err(err) => return Err(err.into()),
106        };
107        Self::new_impl(db)
108    }
109
110    fn new_impl(db: redb::Database) -> Result<Self> {
111        // Setup all tables
112        let write_tx = db.begin_write()?;
113        let _ = Tables::new(&write_tx)?;
114        write_tx.commit()?;
115
116        // Run database migrations
117        migrations::run_migrations(&db)?;
118
119        Ok(Store {
120            db,
121            transaction: Default::default(),
122            open_replicas: Default::default(),
123            pubkeys: Default::default(),
124        })
125    }
126
127    /// Flush the current transaction, if any.
128    ///
129    /// This is the cheapest way to ensure that the data is persisted.
130    pub fn flush(&mut self) -> Result<()> {
131        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
132            w.commit()?;
133        }
134        Ok(())
135    }
136
137    /// Get a read-only snapshot of the database.
138    ///
139    /// This has the side effect of committing any open write transaction,
140    /// so it can be used as a way to ensure that the data is persisted.
141    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
142        let guard = &mut self.transaction;
143        let tables = match std::mem::take(guard) {
144            CurrentTransaction::None => {
145                let tx = self.db.begin_read()?;
146                ReadOnlyTables::new(tx)?
147            }
148            CurrentTransaction::Write(w) => {
149                w.commit()?;
150                let tx = self.db.begin_read()?;
151                ReadOnlyTables::new(tx)?
152            }
153            CurrentTransaction::Read(tables) => tables,
154        };
155        *guard = CurrentTransaction::Read(tables);
156        match &*guard {
157            CurrentTransaction::Read(ref tables) => Ok(tables),
158            _ => unreachable!(),
159        }
160    }
161
162    /// Get an owned read-only snapshot of the database.
163    ///
164    /// This will open a new read transaction. The read transaction won't be reused for other
165    /// reads.
166    ///
167    /// This has the side effect of committing any open write transaction,
168    /// so it can be used as a way to ensure that the data is persisted.
169    pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
170        // make sure the current transaction is committed
171        self.flush()?;
172        assert!(matches!(self.transaction, CurrentTransaction::None));
173        let tx = self.db.begin_read()?;
174        let tables = ReadOnlyTables::new(tx)?;
175        Ok(tables)
176    }
177
178    /// Get access to the tables to read from them.
179    ///
180    /// The underlying transaction is a write transaction, but with a non-mut
181    /// reference to the tables you can not write.
182    ///
183    /// There is no guarantee that this will be an independent transaction.
184    /// You just get readonly access to the current state of the database.
185    ///
186    /// As such, there is also no guarantee that the data you see is
187    /// already persisted.
188    fn tables(&mut self) -> Result<&Tables> {
189        let guard = &mut self.transaction;
190        let tables = match std::mem::take(guard) {
191            CurrentTransaction::None => {
192                let tx = self.db.begin_write()?;
193                TransactionAndTables::new(tx)?
194            }
195            CurrentTransaction::Write(w) => {
196                if w.since.elapsed() > MAX_COMMIT_DELAY {
197                    tracing::debug!("committing transaction because it's too old");
198                    w.commit()?;
199                    let tx = self.db.begin_write()?;
200                    TransactionAndTables::new(tx)?
201                } else {
202                    w
203                }
204            }
205            CurrentTransaction::Read(_) => {
206                let tx = self.db.begin_write()?;
207                TransactionAndTables::new(tx)?
208            }
209        };
210        *guard = CurrentTransaction::Write(tables);
211        match guard {
212            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
213            _ => unreachable!(),
214        }
215    }
216
217    /// Get exclusive write access to the tables in the current transaction.
218    ///
219    /// There is no guarantee that this will be an independent transaction.
220    /// As such, there is also no guarantee that the data you see or write
221    /// will be persisted.
222    ///
223    /// To ensure that the data is persisted, acquire a snapshot of the database
224    /// or call flush.
225    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
226        let guard = &mut self.transaction;
227        let tables = match std::mem::take(guard) {
228            CurrentTransaction::None => {
229                let tx = self.db.begin_write()?;
230                TransactionAndTables::new(tx)?
231            }
232            CurrentTransaction::Write(w) => {
233                if w.since.elapsed() > MAX_COMMIT_DELAY {
234                    tracing::debug!("committing transaction because it's too old");
235                    w.commit()?;
236                    let tx = self.db.begin_write()?;
237                    TransactionAndTables::new(tx)?
238                } else {
239                    w
240                }
241            }
242            CurrentTransaction::Read(_) => {
243                let tx = self.db.begin_write()?;
244                TransactionAndTables::new(tx)?
245            }
246        };
247        *guard = CurrentTransaction::Write(tables);
248        let res = match &mut *guard {
249            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
250            _ => unreachable!(),
251        };
252        Ok(res)
253    }
254}
255
256type PeersIter = std::vec::IntoIter<PeerIdBytes>;
257
258impl Store {
259    /// Create a new replica for `namespace` and persist in this store.
260    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
261        let id = namespace.id();
262        self.import_namespace(namespace.into())?;
263        self.open_replica(&id).map_err(Into::into)
264    }
265
266    /// Create a new author key and persist it in the store.
267    pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
268        let author = Author::new(rng);
269        self.import_author(author.clone())?;
270        Ok(author)
271    }
272
273    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
274    ///
275    /// Returns the number of authors that the other peer has updates for.
276    pub fn has_news_for_us(
277        &mut self,
278        namespace: NamespaceId,
279        heads: &AuthorHeads,
280    ) -> Result<Option<NonZeroU64>> {
281        let our_heads = {
282            let latest = self.get_latest_for_each_author(namespace)?;
283            let mut heads = AuthorHeads::default();
284            for e in latest {
285                let (author, timestamp, _key) = e?;
286                heads.insert(author, timestamp);
287            }
288            heads
289        };
290        let has_news_for_us = heads.has_news_for(&our_heads);
291        Ok(has_news_for_us)
292    }
293
294    /// Open a replica from this store.
295    ///
296    /// This just calls load_replica_info and then creates a new replica with the info.
297    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
298        let info = self.load_replica_info(namespace_id)?;
299        let instance = StoreInstance::new(*namespace_id, self);
300        Ok(Replica::new(instance, Box::new(info)))
301    }
302
303    /// Load the replica info from the store.
304    pub fn load_replica_info(
305        &mut self,
306        namespace_id: &NamespaceId,
307    ) -> Result<ReplicaInfo, OpenError> {
308        let tables = self.tables()?;
309        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
310            Ok(Some(db_value)) => {
311                let (raw_kind, raw_bytes) = db_value.value();
312                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
313                ReplicaInfo::new(namespace)
314            }
315            Ok(None) => return Err(OpenError::NotFound),
316            Err(err) => return Err(OpenError::Other(err.into())),
317        };
318        self.open_replicas.insert(info.capability.id());
319        Ok(info)
320    }
321
322    /// Close a replica.
323    pub fn close_replica(&mut self, id: NamespaceId) {
324        self.open_replicas.remove(&id);
325    }
326
327    /// List all replica namespaces in this store.
328    pub fn list_namespaces(
329        &mut self,
330    ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
331        let snapshot = self.snapshot()?;
332        let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
333        let iter = iter.map(|res| {
334            let capability = parse_capability(res?.1.value())?;
335            Ok((capability.id(), capability.kind()))
336        });
337        Ok(iter)
338    }
339
340    /// Get an author key from the store.
341    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
342        let tables = self.tables()?;
343        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
344            return Ok(None);
345        };
346        let author = Author::from_bytes(author.value());
347        Ok(Some(author))
348    }
349
350    /// Import an author key pair.
351    pub fn import_author(&mut self, author: Author) -> Result<()> {
352        self.modify(|tables| {
353            tables
354                .authors
355                .insert(author.id().as_bytes(), &author.to_bytes())?;
356            Ok(())
357        })
358    }
359
360    /// Delete an author.
361    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
362        self.modify(|tables| {
363            tables.authors.remove(author.as_bytes())?;
364            Ok(())
365        })
366    }
367
368    /// List all author keys in this store.
369    pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
370        let tables = self.snapshot()?;
371        let iter = tables
372            .authors
373            .range::<&'static [u8; 32]>(..)?
374            .map(|res| match res {
375                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
376                Err(err) => Err(err.into()),
377            });
378        Ok(iter)
379    }
380
381    /// Import a new replica namespace.
382    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
383        self.modify(|tables| {
384            let outcome = {
385                let (capability, outcome) = {
386                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
387                    if let Some(existing) = existing {
388                        let mut existing = parse_capability(existing.value())?;
389                        let outcome = if existing.merge(capability)? {
390                            ImportNamespaceOutcome::Upgraded
391                        } else {
392                            ImportNamespaceOutcome::NoChange
393                        };
394                        (existing, outcome)
395                    } else {
396                        (capability, ImportNamespaceOutcome::Inserted)
397                    }
398                };
399                let id = capability.id().to_bytes();
400                let (kind, bytes) = capability.raw();
401                tables.namespaces.insert(&id, (kind, &bytes))?;
402                outcome
403            };
404            Ok(outcome)
405        })
406    }
407
408    /// Remove a replica.
409    ///
410    /// Completely removes a replica and deletes both the namespace private key and all document
411    /// entries.
412    ///
413    /// Note that a replica has to be closed before it can be removed. The store has to enforce
414    /// that a replica cannot be removed while it is still open.
415    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
416        if self.open_replicas.contains(namespace) {
417            return Err(anyhow!("replica is not closed"));
418        }
419        self.modify(|tables| {
420            let bounds = RecordsBounds::namespace(*namespace);
421            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
422            let bounds = ByKeyBounds::namespace(*namespace);
423            let _ = tables
424                .records_by_key
425                .retain_in(bounds.as_ref(), |_k, _v| false);
426            tables.namespaces.remove(namespace.as_bytes())?;
427            tables.namespace_peers.remove_all(namespace.as_bytes())?;
428            tables.download_policy.remove(namespace.as_bytes())?;
429            Ok(())
430        })
431    }
432
433    /// Get an iterator over entries of a replica.
434    pub fn get_many(
435        &mut self,
436        namespace: NamespaceId,
437        query: impl Into<Query>,
438    ) -> Result<QueryIterator> {
439        let tables = self.snapshot_owned()?;
440        QueryIterator::new(tables, namespace, query.into())
441    }
442
443    /// Get an entry by key and author.
444    pub fn get_exact(
445        &mut self,
446        namespace: NamespaceId,
447        author: AuthorId,
448        key: impl AsRef<[u8]>,
449        include_empty: bool,
450    ) -> Result<Option<SignedEntry>> {
451        get_exact(
452            &self.tables()?.records,
453            namespace,
454            author,
455            key,
456            include_empty,
457        )
458    }
459
460    /// Get all content hashes of all replicas in the store.
461    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
462        let tables = self.snapshot_owned()?;
463        ContentHashesIterator::all(&tables.records)
464    }
465
466    /// Get the latest entry for each author in a namespace.
467    pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
468        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
469    }
470
471    /// Register a peer that has been useful to sync a document.
472    pub fn register_useful_peer(
473        &mut self,
474        namespace: NamespaceId,
475        peer: crate::PeerIdBytes,
476    ) -> Result<()> {
477        let peer = &peer;
478        let namespace = namespace.as_bytes();
479        // calculate nanos since UNIX_EPOCH for a time measurement
480        let nanos = std::time::UNIX_EPOCH
481            .elapsed()
482            .map(|duration| duration.as_nanos() as u64)?;
483        self.modify(|tables| {
484            // ensure the document exists
485            anyhow::ensure!(
486                tables.namespaces.get(namespace)?.is_some(),
487                "document not created"
488            );
489
490            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
491
492            // get the oldest entry since it's candidate for removal
493            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
494                let (oldest_nanos, &oldest_peer) = guard.value();
495                (oldest_nanos, oldest_peer)
496            });
497            match maybe_oldest {
498                None => {
499                    // the table is empty so the peer can be inserted without further checks since
500                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
501                    drop(namespace_peers);
502                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
503                }
504                Some((oldest_nanos, oldest_peer)) => {
505                    let oldest_peer = &oldest_peer;
506
507                    if oldest_peer == peer {
508                        // oldest peer is the current one, so replacing the entry for the peer will
509                        // maintain the size
510                        drop(namespace_peers);
511                        tables
512                            .namespace_peers
513                            .remove(namespace, (oldest_nanos, oldest_peer))?;
514                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
515                    } else {
516                        // calculate the len in the same loop since calling `len` is another fallible operation
517                        let mut len = 1;
518                        // find any previous entry for the same peer to remove it
519                        let mut prev_peer_nanos = None;
520
521                        for result in namespace_peers {
522                            len += 1;
523                            let guard = result?;
524                            let (peer_nanos, peer_bytes) = guard.value();
525                            if prev_peer_nanos.is_none() && peer_bytes == peer {
526                                prev_peer_nanos = Some(peer_nanos)
527                            }
528                        }
529
530                        match prev_peer_nanos {
531                            Some(prev_nanos) => {
532                                // the peer was already present, so we can remove the old entry and
533                                // insert the new one without checking the size
534                                tables
535                                    .namespace_peers
536                                    .remove(namespace, (prev_nanos, peer))?;
537                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
538                            }
539                            None => {
540                                // the peer is new and the table is non empty, add it and check the
541                                // size to decide if the oldest peer should be evicted
542                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
543                                len += 1;
544                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
545                                    tables
546                                        .namespace_peers
547                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
548                                }
549                            }
550                        }
551                    }
552                }
553            }
554            Ok(())
555        })
556    }
557
558    /// Get the peers that have been useful for a document.
559    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
560        let tables = self.tables()?;
561        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
562        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
563            let (_nanos, &peer) = result?.value();
564            peers.push(peer);
565        }
566        if peers.is_empty() {
567            Ok(None)
568        } else {
569            Ok(Some(peers.into_iter()))
570        }
571    }
572
573    /// Set the download policy for a namespace.
574    pub fn set_download_policy(
575        &mut self,
576        namespace: &NamespaceId,
577        policy: DownloadPolicy,
578    ) -> Result<()> {
579        self.modify(|tables| {
580            let namespace = namespace.as_bytes();
581
582            // ensure the document exists
583            anyhow::ensure!(
584                tables.namespaces.get(&namespace)?.is_some(),
585                "document not created"
586            );
587
588            let value = postcard::to_stdvec(&policy)?;
589            tables.download_policy.insert(namespace, value.as_slice())?;
590            Ok(())
591        })
592    }
593
594    /// Get the download policy for a namespace.
595    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
596        let tables = self.tables()?;
597        let value = tables.download_policy.get(namespace.as_bytes())?;
598        Ok(match value {
599            None => DownloadPolicy::default(),
600            Some(value) => postcard::from_bytes(value.value())?,
601        })
602    }
603}
604
605impl PublicKeyStore for Store {
606    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
607        self.pubkeys.public_key(id)
608    }
609}
610
611fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
612    Capability::from_raw(raw_kind, raw_bytes)
613}
614
615fn get_exact(
616    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
617    namespace: NamespaceId,
618    author: AuthorId,
619    key: impl AsRef<[u8]>,
620    include_empty: bool,
621) -> Result<Option<SignedEntry>> {
622    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
623    let record = record_table.get(id)?;
624    Ok(record
625        .map(|r| into_entry(id, r.value()))
626        .filter(|entry| include_empty || !entry.is_empty()))
627}
628
629/// A wrapper around [`Store`] for a specific [`NamespaceId`]
630#[derive(Debug)]
631pub struct StoreInstance<'a> {
632    namespace: NamespaceId,
633    pub(crate) store: &'a mut Store,
634}
635
636impl<'a> StoreInstance<'a> {
637    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
638        StoreInstance { namespace, store }
639    }
640}
641
642impl PublicKeyStore for StoreInstance<'_> {
643    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
644        self.store.public_key(id)
645    }
646}
647
648impl super::DownloadPolicyStore for StoreInstance<'_> {
649    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
650        self.store.get_download_policy(namespace)
651    }
652}
653
654impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
655    type Error = anyhow::Error;
656    type RangeIterator<'x>
657        = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
658    where
659        'a: 'x;
660    type ParentIterator<'x>
661        = ParentIterator
662    where
663        'a: 'x;
664
665    /// Get a the first key (or the default if none is available).
666    fn get_first(&mut self) -> Result<RecordIdentifier> {
667        let tables = self.store.as_mut().tables()?;
668        // TODO: verify this fetches all keys with this namespace
669        let bounds = RecordsBounds::namespace(self.namespace);
670        let mut records = tables.records.range(bounds.as_ref())?;
671
672        let Some(record) = records.next() else {
673            return Ok(RecordIdentifier::default());
674        };
675        let (compound_key, _value) = record?;
676        let (namespace_id, author_id, key) = compound_key.value();
677        let id = RecordIdentifier::new(namespace_id, author_id, key);
678        Ok(id)
679    }
680
681    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
682        self.store
683            .as_mut()
684            .get_exact(id.namespace(), id.author(), id.key(), true)
685    }
686
687    fn len(&mut self) -> Result<usize> {
688        let tables = self.store.as_mut().tables()?;
689        let bounds = RecordsBounds::namespace(self.namespace);
690        let records = tables.records.range(bounds.as_ref())?;
691        Ok(records.count())
692    }
693
694    fn is_empty(&mut self) -> Result<bool> {
695        let tables = self.store.as_mut().tables()?;
696        Ok(tables.records.is_empty()?)
697    }
698
699    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
700        // TODO: optimize
701        let elements = self.get_range(range.clone())?;
702
703        let mut fp = Fingerprint::empty();
704        for el in elements {
705            let el = el?;
706            fp ^= el.as_fingerprint();
707        }
708
709        Ok(fp)
710    }
711
712    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
713        let id = e.id();
714        self.store.as_mut().modify(|tables| {
715            // insert into record table
716            let key = (
717                &id.namespace().to_bytes(),
718                &id.author().to_bytes(),
719                id.key(),
720            );
721            let hash = e.content_hash(); // let binding is needed
722            let value = (
723                e.timestamp(),
724                &e.signature().namespace().to_bytes(),
725                &e.signature().author().to_bytes(),
726                e.content_len(),
727                hash.as_bytes(),
728            );
729            tables.records.insert(key, value)?;
730
731            // insert into by key index table
732            let key = (
733                &id.namespace().to_bytes(),
734                id.key(),
735                &id.author().to_bytes(),
736            );
737            tables.records_by_key.insert(key, ())?;
738
739            // insert into latest table
740            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
741            let value = (e.timestamp(), e.id().key());
742            tables.latest_per_author.insert(key, value)?;
743            Ok(())
744        })
745    }
746
747    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
748        let tables = self.store.as_mut().tables()?;
749        let iter = match range.x().cmp(range.y()) {
750            // identity range: iter1 = all, iter2 = none
751            Ordering::Equal => {
752                // iterator for all entries in replica
753                let bounds = RecordsBounds::namespace(self.namespace);
754                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
755                chain_none(iter)
756            }
757            // regular range: iter1 = x <= t < y, iter2 = none
758            Ordering::Less => {
759                // iterator for entries from range.x to range.y
760                let start = Bound::Included(range.x().to_byte_tuple());
761                let end = Bound::Excluded(range.y().to_byte_tuple());
762                let bounds = RecordsBounds::new(start, end);
763                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
764                chain_none(iter)
765            }
766            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
767            Ordering::Greater => {
768                // iterator for entries from start to range.y
769                let end = Bound::Excluded(range.y().to_byte_tuple());
770                let bounds = RecordsBounds::from_start(&self.namespace, end);
771                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
772
773                // iterator for entries from range.x to end
774                let start = Bound::Included(range.x().to_byte_tuple());
775                let bounds = RecordsBounds::to_end(&self.namespace, start);
776                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
777
778                iter.chain(Some(iter2).into_iter().flatten())
779            }
780        };
781        Ok(iter)
782    }
783
784    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
785        self.store.as_mut().modify(|tables| {
786            let entry = {
787                let (namespace, author, key) = id.as_byte_tuple();
788                let id = (namespace, key, author);
789                tables.records_by_key.remove(id)?;
790                let id = (namespace, author, key);
791                let value = tables.records.remove(id)?;
792                value.map(|value| into_entry(id, value.value()))
793            };
794            Ok(entry)
795        })
796    }
797
798    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
799        let tables = self.store.as_mut().tables()?;
800        let bounds = RecordsBounds::namespace(self.namespace);
801        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
802        Ok(chain_none(iter))
803    }
804
805    fn prefixes_of(
806        &mut self,
807        id: &RecordIdentifier,
808    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
809        let tables = self.store.as_mut().tables()?;
810        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
811    }
812
813    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
814        let tables = self.store.as_mut().tables()?;
815        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
816        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
817        Ok(chain_none(iter))
818    }
819
820    fn remove_prefix_filtered(
821        &mut self,
822        id: &RecordIdentifier,
823        predicate: impl Fn(&Record) -> bool,
824    ) -> Result<usize> {
825        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
826        self.store.as_mut().modify(|tables| {
827            let cb = |_k: RecordsId, v: RecordsValue| {
828                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
829                let record = Record::new(hash.into(), len, timestamp);
830
831                predicate(&record)
832            };
833            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
834            let count = iter.count();
835            Ok(count)
836        })
837    }
838}
839
840fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
841    iter: I,
842) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
843    iter.chain(None.into_iter().flatten())
844}
845
846/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
847/// is a prefix of the key passed to the iterator.
848#[derive(Debug)]
849pub struct ParentIterator {
850    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
851}
852
853impl ParentIterator {
854    fn new(
855        tables: &Tables,
856        namespace: NamespaceId,
857        author: AuthorId,
858        key: Vec<u8>,
859    ) -> anyhow::Result<Self> {
860        let parents = parents(&tables.records, namespace, author, key.clone());
861        Ok(Self {
862            inner: parents.into_iter(),
863        })
864    }
865}
866
867fn parents(
868    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
869    namespace: NamespaceId,
870    author: AuthorId,
871    mut key: Vec<u8>,
872) -> Vec<anyhow::Result<SignedEntry>> {
873    let mut res = Vec::new();
874
875    while !key.is_empty() {
876        let entry = get_exact(table, namespace, author, &key, false);
877        key.pop();
878        match entry {
879            Err(err) => res.push(Err(err)),
880            Ok(Some(entry)) => res.push(Ok(entry)),
881            Ok(None) => continue,
882        }
883    }
884    res.reverse();
885    res
886}
887
888impl Iterator for ParentIterator {
889    type Item = Result<SignedEntry>;
890
891    fn next(&mut self) -> Option<Self::Item> {
892        self.inner.next()
893    }
894}
895
896/// Iterator for all content hashes
897///
898/// Note that you might get duplicate hashes. Also, the iterator will keep
899/// a database snapshot open until it is dropped.
900///
901/// Also, this represents a snapshot of the database at the time of creation.
902/// It needs a copy of a redb::ReadOnlyTable to be self-contained.
903#[derive(derive_more::Debug)]
904pub struct ContentHashesIterator {
905    #[debug(skip)]
906    range: RecordsRange<'static>,
907}
908
909impl ContentHashesIterator {
910    /// Create a new iterator over all content hashes.
911    pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
912        let range = RecordsRange::all_static(table)?;
913        Ok(Self { range })
914    }
915}
916
917impl Iterator for ContentHashesIterator {
918    type Item = Result<Hash>;
919
920    fn next(&mut self) -> Option<Self::Item> {
921        let v = self.range.next()?;
922        Some(v.map(|e| e.content_hash()))
923    }
924}
925
926/// Iterator over the latest entry per author.
927#[derive(derive_more::Debug)]
928#[debug("LatestIterator")]
929pub struct LatestIterator<'a>(
930    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
931);
932
933impl<'a> LatestIterator<'a> {
934    fn new(
935        latest_per_author: &'a impl ReadableTable<
936            LatestPerAuthorKey<'static>,
937            LatestPerAuthorValue<'static>,
938        >,
939        namespace: NamespaceId,
940    ) -> anyhow::Result<Self> {
941        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
942        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
943        let range = latest_per_author.range(start..=end)?;
944        Ok(Self(range))
945    }
946}
947
948impl Iterator for LatestIterator<'_> {
949    type Item = Result<(AuthorId, u64, Vec<u8>)>;
950
951    fn next(&mut self) -> Option<Self::Item> {
952        self.0.next_map(|key, value| {
953            let (_namespace, author) = key;
954            let (timestamp, key) = value;
955            (author.into(), timestamp, key.to_vec())
956        })
957    }
958}
959
960fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
961    let (namespace, author, key) = key;
962    let (timestamp, namespace_sig, author_sig, len, hash) = value;
963    let id = RecordIdentifier::new(namespace, author, key);
964    let record = Record::new(hash.into(), len, timestamp);
965    let entry = Entry::new(id, record);
966    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
967    SignedEntry::new(entry_signature, entry)
968}
969
970#[cfg(test)]
971mod tests {
972    use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
973    use crate::ranger::Store as _;
974
975    #[test]
976    fn test_ranges() -> Result<()> {
977        let dbfile = tempfile::NamedTempFile::new()?;
978        let mut store = Store::persistent(dbfile.path())?;
979
980        let author = store.new_author(&mut rand::thread_rng())?;
981        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
982        let mut replica = store.new_replica(namespace.clone())?;
983
984        // test author prefix relation for all-255 keys
985        let key1 = vec![255, 255];
986        let key2 = vec![255, 255, 255];
987        replica.hash_and_insert(&key1, &author, b"v1")?;
988        replica.hash_and_insert(&key2, &author, b"v2")?;
989        let res = store
990            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
991            .collect::<Result<Vec<_>>>()?;
992        assert_eq!(res.len(), 2);
993        assert_eq!(
994            res.into_iter()
995                .map(|entry| entry.key().to_vec())
996                .collect::<Vec<_>>(),
997            vec![key1, key2]
998        );
999        Ok(())
1000    }
1001
1002    #[test]
1003    fn test_basics() -> Result<()> {
1004        let dbfile = tempfile::NamedTempFile::new()?;
1005        let mut store = Store::persistent(dbfile.path())?;
1006
1007        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1008        assert!(authors.is_empty());
1009
1010        let author = store.new_author(&mut rand::thread_rng())?;
1011        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1012        let _replica = store.new_replica(namespace.clone())?;
1013        store.close_replica(namespace.id());
1014        let replica = store.load_replica_info(&namespace.id())?;
1015        assert_eq!(replica.capability.id(), namespace.id());
1016
1017        let author_back = store.get_author(&author.id())?.unwrap();
1018        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1019
1020        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1021        for i in 0..5 {
1022            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1023            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1024            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1025            wrapper.entry_put(entry)?;
1026        }
1027
1028        // all
1029        let all: Vec<_> = wrapper.all()?.collect();
1030        assert_eq!(all.len(), 5);
1031
1032        // add a second version
1033        let mut ids = Vec::new();
1034        for i in 0..5 {
1035            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1036            let entry = Entry::new(
1037                id.clone(),
1038                Record::current_from_data(format!("world-{i}-2")),
1039            );
1040            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1041            wrapper.entry_put(entry)?;
1042            ids.push(id);
1043        }
1044
1045        // get all
1046        let entries = wrapper
1047            .store
1048            .get_many(namespace.id(), Query::all())?
1049            .collect::<Result<Vec<_>>>()?;
1050        assert_eq!(entries.len(), 5);
1051
1052        // get all prefix
1053        let entries = wrapper
1054            .store
1055            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1056            .collect::<Result<Vec<_>>>()?;
1057        assert_eq!(entries.len(), 5);
1058
1059        // delete and get
1060        for id in ids {
1061            let res = wrapper.get(&id)?;
1062            assert!(res.is_some());
1063            let out = wrapper.entry_remove(&id)?.unwrap();
1064            assert_eq!(out.entry().id(), &id);
1065            let res = wrapper.get(&id)?;
1066            assert!(res.is_none());
1067        }
1068
1069        // get latest
1070        let entries = wrapper
1071            .store
1072            .get_many(namespace.id(), Query::all())?
1073            .collect::<Result<Vec<_>>>()?;
1074        assert_eq!(entries.len(), 0);
1075
1076        Ok(())
1077    }
1078
1079    fn copy_and_modify(
1080        source: &Path,
1081        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1082    ) -> Result<tempfile::NamedTempFile> {
1083        let dbfile = tempfile::NamedTempFile::new()?;
1084        std::fs::copy(source, dbfile.path())?;
1085        let db = Database::create(dbfile.path())?;
1086        let write_tx = db.begin_write()?;
1087        modify(&write_tx)?;
1088        write_tx.commit()?;
1089        drop(db);
1090        Ok(dbfile)
1091    }
1092
1093    #[test]
1094    fn test_migration_001_populate_latest_table() -> Result<()> {
1095        let dbfile = tempfile::NamedTempFile::new()?;
1096        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1097
1098        // create a store and add some data
1099        let expected = {
1100            let mut store = Store::persistent(dbfile.path())?;
1101            let author1 = store.new_author(&mut rand::thread_rng())?;
1102            let author2 = store.new_author(&mut rand::thread_rng())?;
1103            let mut replica = store.new_replica(namespace.clone())?;
1104            replica.hash_and_insert(b"k1", &author1, b"v1")?;
1105            replica.hash_and_insert(b"k2", &author2, b"v1")?;
1106            replica.hash_and_insert(b"k3", &author1, b"v1")?;
1107
1108            let expected = store
1109                .get_latest_for_each_author(namespace.id())?
1110                .collect::<Result<Vec<_>>>()?;
1111            // drop everything to clear file locks.
1112            store.close_replica(namespace.id());
1113            // flush the store to disk
1114            store.flush()?;
1115            drop(store);
1116            expected
1117        };
1118        assert_eq!(expected.len(), 2);
1119
1120        // create a copy of our db file with the latest table deleted.
1121        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1122            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1123            Ok(())
1124        })?;
1125
1126        // open the copied db file, which will run the migration.
1127        let mut store = Store::persistent(dbfile_before_migration.path())?;
1128        let actual = store
1129            .get_latest_for_each_author(namespace.id())?
1130            .collect::<Result<Vec<_>>>()?;
1131
1132        assert_eq!(expected, actual);
1133        Ok(())
1134    }
1135
1136    #[test]
1137    fn test_migration_004_populate_by_key_index() -> Result<()> {
1138        let dbfile = tempfile::NamedTempFile::new()?;
1139
1140        let mut store = Store::persistent(dbfile.path())?;
1141
1142        // check that the new table is there, even if empty
1143        {
1144            let tables = store.tables()?;
1145            assert_eq!(tables.records_by_key.len()?, 0);
1146        }
1147
1148        // TODO: write test checking that the indexing is done correctly
1149        Ok(())
1150    }
1151}