iroh_sync/store/
fs.rs

1//! On disk storage for replicas.
2
3use std::{
4    cmp::Ordering,
5    collections::HashSet,
6    iter::{Chain, Flatten},
7    num::NonZeroU64,
8    ops::Bound,
9    path::Path,
10    time::Duration,
11};
12
13use anyhow::{anyhow, Result};
14use ed25519_dalek::{SignatureError, VerifyingKey};
15use iroh_base::hash::Hash;
16use rand_core::CryptoRngCore;
17use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
18
19use crate::{
20    keys::Author,
21    ranger::{Fingerprint, Range, RangeEntry},
22    sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
23    AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
24    ReplicaInfo,
25};
26
27use super::{
28    pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
29    Query,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39use self::{
40    bounds::{ByKeyBounds, RecordsBounds},
41    ranges::RangeExt,
42    tables::{RecordsTable, TransactionAndTables},
43};
44use self::{
45    query::QueryIterator,
46    tables::{
47        LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsValue, Tables,
48    },
49};
50
51pub use self::ranges::RecordsRange;
52
53/// Manages the replicas and authors for an instance.
54#[derive(Debug)]
55pub struct Store {
56    db: Database,
57    transaction: CurrentTransaction,
58    open_replicas: HashSet<NamespaceId>,
59    pubkeys: MemPublicKeyStore,
60}
61
62impl AsRef<Store> for Store {
63    fn as_ref(&self) -> &Store {
64        self
65    }
66}
67
68impl AsMut<Store> for Store {
69    fn as_mut(&mut self) -> &mut Store {
70        self
71    }
72}
73
74#[derive(derive_more::Debug, Default)]
75enum CurrentTransaction {
76    #[default]
77    None,
78    Read(ReadOnlyTables),
79    Write(TransactionAndTables),
80}
81
82impl Store {
83    /// Create a new store in memory.
84    pub fn memory() -> Self {
85        Self::memory_impl().expect("failed to create memory store")
86    }
87
88    fn memory_impl() -> Result<Self> {
89        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
90        Self::new_impl(db)
91    }
92
93    /// Create or open a store from a `path` to a database file.
94    ///
95    /// The file will be created if it does not exist, otherwise it will be opened.
96    pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
97        let db = match Database::create(&path) {
98            Ok(db) => db,
99            Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
100            Err(err) => return Err(err.into()),
101        };
102        Self::new_impl(db)
103    }
104
105    fn new_impl(db: redb::Database) -> Result<Self> {
106        // Setup all tables
107        let write_tx = db.begin_write()?;
108        let _ = Tables::new(&write_tx)?;
109        write_tx.commit()?;
110
111        // Run database migrations
112        migrations::run_migrations(&db)?;
113
114        Ok(Store {
115            db,
116            transaction: Default::default(),
117            open_replicas: Default::default(),
118            pubkeys: Default::default(),
119        })
120    }
121
122    /// Flush the current transaction, if any.
123    ///
124    /// This is the cheapest way to ensure that the data is persisted.
125    pub fn flush(&mut self) -> Result<()> {
126        if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
127            w.commit()?;
128        }
129        Ok(())
130    }
131
132    /// Get a read-only snapshot of the database.
133    ///
134    /// This has the side effect of committing any open write transaction,
135    /// so it can be used as a way to ensure that the data is persisted.
136    pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
137        let guard = &mut self.transaction;
138        let tables = match std::mem::take(guard) {
139            CurrentTransaction::None => {
140                let tx = self.db.begin_read()?;
141                ReadOnlyTables::new(tx)?
142            }
143            CurrentTransaction::Write(w) => {
144                w.commit()?;
145                let tx = self.db.begin_read()?;
146                ReadOnlyTables::new(tx)?
147            }
148            CurrentTransaction::Read(tables) => tables,
149        };
150        *guard = CurrentTransaction::Read(tables);
151        match &*guard {
152            CurrentTransaction::Read(ref tables) => Ok(tables),
153            _ => unreachable!(),
154        }
155    }
156
157    /// Get access to the tables to read from them.
158    ///
159    /// The underlying transaction is a write transaction, but with a non-mut
160    /// reference to the tables you can not write.
161    ///
162    /// There is no guarantee that this will be an independent transaction.
163    /// You just get readonly access to the current state of the database.
164    ///
165    /// As such, there is also no guarantee that the data you see is
166    /// already persisted.
167    fn tables(&mut self) -> Result<&Tables> {
168        let guard = &mut self.transaction;
169        let tables = match std::mem::take(guard) {
170            CurrentTransaction::None => {
171                let tx = self.db.begin_write()?;
172                TransactionAndTables::new(tx)?
173            }
174            CurrentTransaction::Write(w) => {
175                if w.since.elapsed() > Duration::from_millis(500) {
176                    tracing::debug!("committing transaction because it's too old");
177                    w.commit()?;
178                    let tx = self.db.begin_write()?;
179                    TransactionAndTables::new(tx)?
180                } else {
181                    w
182                }
183            }
184            CurrentTransaction::Read(_) => {
185                let tx = self.db.begin_write()?;
186                TransactionAndTables::new(tx)?
187            }
188        };
189        *guard = CurrentTransaction::Write(tables);
190        match guard {
191            CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
192            _ => unreachable!(),
193        }
194    }
195
196    /// Get exclusive write access to the tables in the current transaction.
197    ///
198    /// There is no guarantee that this will be an independent transaction.
199    /// As such, there is also no guarantee that the data you see or write
200    /// will be persisted.
201    ///
202    /// To ensure that the data is persisted, acquire a snapshot of the database
203    /// or call flush.
204    fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
205        let guard = &mut self.transaction;
206        let tables = match std::mem::take(guard) {
207            CurrentTransaction::None => {
208                let tx = self.db.begin_write()?;
209                TransactionAndTables::new(tx)?
210            }
211            CurrentTransaction::Write(w) => w,
212            CurrentTransaction::Read(_) => {
213                let tx = self.db.begin_write()?;
214                TransactionAndTables::new(tx)?
215            }
216        };
217        *guard = CurrentTransaction::Write(tables);
218        let res = match &mut *guard {
219            CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
220            _ => unreachable!(),
221        };
222        Ok(res)
223    }
224}
225
226type AuthorsIter = std::vec::IntoIter<Result<Author>>;
227type NamespaceIter = std::vec::IntoIter<Result<(NamespaceId, CapabilityKind)>>;
228type PeersIter = std::vec::IntoIter<PeerIdBytes>;
229
230impl Store {
231    /// Create a new replica for `namespace` and persist in this store.
232    pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
233        let id = namespace.id();
234        self.import_namespace(namespace.into())?;
235        self.open_replica(&id).map_err(Into::into)
236    }
237
238    /// Create a new author key and persist it in the store.
239    pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
240        let author = Author::new(rng);
241        self.import_author(author.clone())?;
242        Ok(author)
243    }
244
245    /// Check if a [`AuthorHeads`] contains entry timestamps that we do not have locally.
246    ///
247    /// Returns the number of authors that the other peer has updates for.
248    pub fn has_news_for_us(
249        &mut self,
250        namespace: NamespaceId,
251        heads: &AuthorHeads,
252    ) -> Result<Option<NonZeroU64>> {
253        let our_heads = {
254            let latest = self.get_latest_for_each_author(namespace)?;
255            let mut heads = AuthorHeads::default();
256            for e in latest {
257                let (author, timestamp, _key) = e?;
258                heads.insert(author, timestamp);
259            }
260            heads
261        };
262        let has_news_for_us = heads.has_news_for(&our_heads);
263        Ok(has_news_for_us)
264    }
265
266    /// Open a replica from this store.
267    ///
268    /// This just calls load_replica_info and then creates a new replica with the info.
269    pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
270        let info = self.load_replica_info(namespace_id)?;
271        let instance = StoreInstance::new(*namespace_id, self);
272        Ok(Replica::new(instance, Box::new(info)))
273    }
274
275    /// Load the replica info from the store.
276    pub fn load_replica_info(
277        &mut self,
278        namespace_id: &NamespaceId,
279    ) -> Result<ReplicaInfo, OpenError> {
280        let tables = self.tables()?;
281        let info = match tables.namespaces.get(namespace_id.as_bytes()) {
282            Ok(Some(db_value)) => {
283                let (raw_kind, raw_bytes) = db_value.value();
284                let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
285                ReplicaInfo::new(namespace)
286            }
287            Ok(None) => return Err(OpenError::NotFound),
288            Err(err) => return Err(OpenError::Other(err.into())),
289        };
290        self.open_replicas.insert(info.capability.id());
291        Ok(info)
292    }
293
294    /// Close a replica.
295    pub fn close_replica(&mut self, id: NamespaceId) {
296        self.open_replicas.remove(&id);
297    }
298
299    /// List all replica namespaces in this store.
300    pub fn list_namespaces(&mut self) -> Result<NamespaceIter> {
301        // TODO: avoid collect
302        let tables = self.tables()?;
303        let namespaces: Vec<_> = tables
304            .namespaces
305            .iter()?
306            .map(|res| {
307                let capability = parse_capability(res?.1.value())?;
308                Ok((capability.id(), capability.kind()))
309            })
310            .collect();
311        Ok(namespaces.into_iter())
312    }
313
314    /// Get an author key from the store.
315    pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
316        let tables = self.tables()?;
317        let Some(author) = tables.authors.get(author_id.as_bytes())? else {
318            return Ok(None);
319        };
320        let author = Author::from_bytes(author.value());
321        Ok(Some(author))
322    }
323
324    /// Import an author key pair.
325    pub fn import_author(&mut self, author: Author) -> Result<()> {
326        self.modify(|tables| {
327            tables
328                .authors
329                .insert(author.id().as_bytes(), &author.to_bytes())?;
330            Ok(())
331        })
332    }
333
334    /// Delte an author.
335    pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
336        self.modify(|tables| {
337            tables.authors.remove(author.as_bytes())?;
338            Ok(())
339        })
340    }
341
342    /// List all author keys in this store.
343    pub fn list_authors(&mut self) -> Result<AuthorsIter> {
344        // TODO: avoid collect
345        let tables = self.tables()?;
346        let authors: Vec<_> = tables
347            .authors
348            .iter()?
349            .map(|res| match res {
350                Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
351                Err(err) => Err(err.into()),
352            })
353            .collect();
354
355        Ok(authors.into_iter())
356    }
357
358    /// Import a new replica namespace.
359    pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
360        self.modify(|tables| {
361            let outcome = {
362                let (capability, outcome) = {
363                    let existing = tables.namespaces.get(capability.id().as_bytes())?;
364                    if let Some(existing) = existing {
365                        let mut existing = parse_capability(existing.value())?;
366                        let outcome = if existing.merge(capability)? {
367                            ImportNamespaceOutcome::Upgraded
368                        } else {
369                            ImportNamespaceOutcome::NoChange
370                        };
371                        (existing, outcome)
372                    } else {
373                        (capability, ImportNamespaceOutcome::Inserted)
374                    }
375                };
376                let id = capability.id().to_bytes();
377                let (kind, bytes) = capability.raw();
378                tables.namespaces.insert(&id, (kind, &bytes))?;
379                outcome
380            };
381            Ok(outcome)
382        })
383    }
384
385    /// Remove a replica.
386    ///
387    /// Completely removes a replica and deletes both the namespace private key and all document
388    /// entries.
389    ///
390    /// Note that a replica has to be closed before it can be removed. The store has to enforce
391    /// that a replica cannot be removed while it is still open.
392    pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
393        if self.open_replicas.contains(namespace) {
394            return Err(anyhow!("replica is not closed"));
395        }
396        self.modify(|tables| {
397            let bounds = RecordsBounds::namespace(*namespace);
398            tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
399            let bounds = ByKeyBounds::namespace(*namespace);
400            let _ = tables
401                .records_by_key
402                .retain_in(bounds.as_ref(), |_k, _v| false);
403            tables.namespaces.remove(namespace.as_bytes())?;
404            tables.namespace_peers.remove_all(namespace.as_bytes())?;
405            tables.download_policy.remove(namespace.as_bytes())?;
406            Ok(())
407        })
408    }
409
410    /// Get an iterator over entries of a replica.
411    pub fn get_many(
412        &mut self,
413        namespace: NamespaceId,
414        query: impl Into<Query>,
415    ) -> Result<QueryIterator> {
416        QueryIterator::new(self.tables()?, namespace, query.into())
417    }
418
419    /// Get an entry by key and author.
420    pub fn get_exact(
421        &mut self,
422        namespace: NamespaceId,
423        author: AuthorId,
424        key: impl AsRef<[u8]>,
425        include_empty: bool,
426    ) -> Result<Option<SignedEntry>> {
427        get_exact(
428            &self.tables()?.records,
429            namespace,
430            author,
431            key,
432            include_empty,
433        )
434    }
435
436    /// Get all content hashes of all replicas in the store.
437    pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
438        // make sure the current transaction is committed
439        self.flush()?;
440        assert!(matches!(self.transaction, CurrentTransaction::None));
441        let tx = self.db.begin_read()?;
442        let tables = ReadOnlyTables::new(tx)?;
443        let records = tables.records;
444        ContentHashesIterator::all(records)
445    }
446
447    /// Get the latest entry for each author in a namespace.
448    pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
449        LatestIterator::new(&self.tables()?.latest_per_author, namespace)
450    }
451
452    /// Register a peer that has been useful to sync a document.
453    pub fn register_useful_peer(
454        &mut self,
455        namespace: NamespaceId,
456        peer: crate::PeerIdBytes,
457    ) -> Result<()> {
458        let peer = &peer;
459        let namespace = namespace.as_bytes();
460        // calculate nanos since UNIX_EPOCH for a time measurement
461        let nanos = std::time::UNIX_EPOCH
462            .elapsed()
463            .map(|duration| duration.as_nanos() as u64)?;
464        self.modify(|tables| {
465            // ensure the document exists
466            anyhow::ensure!(
467                tables.namespaces.get(namespace)?.is_some(),
468                "document not created"
469            );
470
471            let mut namespace_peers = tables.namespace_peers.get(namespace)?;
472
473            // get the oldest entry since it's candidate for removal
474            let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
475                let (oldest_nanos, &oldest_peer) = guard.value();
476                (oldest_nanos, oldest_peer)
477            });
478            match maybe_oldest {
479                None => {
480                    // the table is empty so the peer can be inserted without further checks since
481                    // super::PEERS_PER_DOC_CACHE_SIZE is non zero
482                    drop(namespace_peers);
483                    tables.namespace_peers.insert(namespace, (nanos, peer))?;
484                }
485                Some((oldest_nanos, oldest_peer)) => {
486                    let oldest_peer = &oldest_peer;
487
488                    if oldest_peer == peer {
489                        // oldest peer is the current one, so replacing the entry for the peer will
490                        // maintain the size
491                        drop(namespace_peers);
492                        tables
493                            .namespace_peers
494                            .remove(namespace, (oldest_nanos, oldest_peer))?;
495                        tables.namespace_peers.insert(namespace, (nanos, peer))?;
496                    } else {
497                        // calculate the len in the same loop since calling `len` is another fallible operation
498                        let mut len = 1;
499                        // find any previous entry for the same peer to remove it
500                        let mut prev_peer_nanos = None;
501
502                        for result in namespace_peers {
503                            len += 1;
504                            let guard = result?;
505                            let (peer_nanos, peer_bytes) = guard.value();
506                            if prev_peer_nanos.is_none() && peer_bytes == peer {
507                                prev_peer_nanos = Some(peer_nanos)
508                            }
509                        }
510
511                        match prev_peer_nanos {
512                            Some(prev_nanos) => {
513                                // the peer was already present, so we can remove the old entry and
514                                // insert the new one without checking the size
515                                tables
516                                    .namespace_peers
517                                    .remove(namespace, (prev_nanos, peer))?;
518                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
519                            }
520                            None => {
521                                // the peer is new and the table is non empty, add it and check the
522                                // size to decide if the oldest peer should be evicted
523                                tables.namespace_peers.insert(namespace, (nanos, peer))?;
524                                len += 1;
525                                if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
526                                    tables
527                                        .namespace_peers
528                                        .remove(namespace, (oldest_nanos, oldest_peer))?;
529                                }
530                            }
531                        }
532                    }
533                }
534            }
535            Ok(())
536        })
537    }
538
539    /// Get the peers that have been useful for a document.
540    pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
541        let tables = self.tables()?;
542        let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
543        for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
544            let (_nanos, &peer) = result?.value();
545            peers.push(peer);
546        }
547        if peers.is_empty() {
548            Ok(None)
549        } else {
550            Ok(Some(peers.into_iter()))
551        }
552    }
553
554    /// Set the download policy for a namespace.
555    pub fn set_download_policy(
556        &mut self,
557        namespace: &NamespaceId,
558        policy: DownloadPolicy,
559    ) -> Result<()> {
560        self.modify(|tables| {
561            let namespace = namespace.as_bytes();
562
563            // ensure the document exists
564            anyhow::ensure!(
565                tables.namespaces.get(&namespace)?.is_some(),
566                "document not created"
567            );
568
569            let value = postcard::to_stdvec(&policy)?;
570            tables.download_policy.insert(namespace, value.as_slice())?;
571            Ok(())
572        })
573    }
574
575    /// Get the download policy for a namespace.
576    pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
577        let tables = self.tables()?;
578        let value = tables.download_policy.get(namespace.as_bytes())?;
579        Ok(match value {
580            None => DownloadPolicy::default(),
581            Some(value) => postcard::from_bytes(value.value())?,
582        })
583    }
584}
585
586impl PublicKeyStore for Store {
587    fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
588        self.pubkeys.public_key(id)
589    }
590}
591
592fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
593    Capability::from_raw(raw_kind, raw_bytes)
594}
595
596fn get_exact(
597    record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
598    namespace: NamespaceId,
599    author: AuthorId,
600    key: impl AsRef<[u8]>,
601    include_empty: bool,
602) -> Result<Option<SignedEntry>> {
603    let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
604    let record = record_table.get(id)?;
605    Ok(record
606        .map(|r| into_entry(id, r.value()))
607        .filter(|entry| include_empty || !entry.is_empty()))
608}
609
610/// A wrapper around [`Store`] for a specific [`NamespaceId`]
611#[derive(Debug)]
612pub struct StoreInstance<'a> {
613    namespace: NamespaceId,
614    pub(crate) store: &'a mut Store,
615}
616
617impl<'a> StoreInstance<'a> {
618    pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
619        StoreInstance { namespace, store }
620    }
621}
622
623impl<'a> PublicKeyStore for StoreInstance<'a> {
624    fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
625        self.store.public_key(id)
626    }
627}
628
629impl<'a> super::DownloadPolicyStore for StoreInstance<'a> {
630    fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
631        self.store.get_download_policy(namespace)
632    }
633}
634
635impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
636    type Error = anyhow::Error;
637    type RangeIterator<'x> = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
638        where 'a: 'x;
639    type ParentIterator<'x> = ParentIterator
640        where 'a: 'x;
641
642    /// Get a the first key (or the default if none is available).
643    fn get_first(&mut self) -> Result<RecordIdentifier> {
644        let tables = self.store.as_mut().tables()?;
645        // TODO: verify this fetches all keys with this namespace
646        let bounds = RecordsBounds::namespace(self.namespace);
647        let mut records = tables.records.range(bounds.as_ref())?;
648
649        let Some(record) = records.next() else {
650            return Ok(RecordIdentifier::default());
651        };
652        let (compound_key, _value) = record?;
653        let (namespace_id, author_id, key) = compound_key.value();
654        let id = RecordIdentifier::new(namespace_id, author_id, key);
655        Ok(id)
656    }
657
658    fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
659        self.store
660            .as_mut()
661            .get_exact(id.namespace(), id.author(), id.key(), true)
662    }
663
664    fn len(&mut self) -> Result<usize> {
665        let tables = self.store.as_mut().tables()?;
666        let bounds = RecordsBounds::namespace(self.namespace);
667        let records = tables.records.range(bounds.as_ref())?;
668        Ok(records.count())
669    }
670
671    fn is_empty(&mut self) -> Result<bool> {
672        let tables = self.store.as_mut().tables()?;
673        Ok(tables.records.is_empty()?)
674    }
675
676    fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
677        // TODO: optimize
678        let elements = self.get_range(range.clone())?;
679
680        let mut fp = Fingerprint::empty();
681        for el in elements {
682            let el = el?;
683            fp ^= el.as_fingerprint();
684        }
685
686        Ok(fp)
687    }
688
689    fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
690        let id = e.id();
691        self.store.as_mut().modify(|tables| {
692            // insert into record table
693            let key = (
694                &id.namespace().to_bytes(),
695                &id.author().to_bytes(),
696                id.key(),
697            );
698            let hash = e.content_hash(); // let binding is needed
699            let value = (
700                e.timestamp(),
701                &e.signature().namespace().to_bytes(),
702                &e.signature().author().to_bytes(),
703                e.content_len(),
704                hash.as_bytes(),
705            );
706            tables.records.insert(key, value)?;
707
708            // insert into by key index table
709            let key = (
710                &id.namespace().to_bytes(),
711                id.key(),
712                &id.author().to_bytes(),
713            );
714            tables.records_by_key.insert(key, ())?;
715
716            // insert into latest table
717            let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
718            let value = (e.timestamp(), e.id().key());
719            tables.latest_per_author.insert(key, value)?;
720            Ok(())
721        })
722    }
723
724    fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
725        let tables = self.store.as_mut().tables()?;
726        let iter = match range.x().cmp(range.y()) {
727            // identity range: iter1 = all, iter2 = none
728            Ordering::Equal => {
729                // iterator for all entries in replica
730                let bounds = RecordsBounds::namespace(self.namespace);
731                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
732                chain_none(iter)
733            }
734            // regular range: iter1 = x <= t < y, iter2 = none
735            Ordering::Less => {
736                // iterator for entries from range.x to range.y
737                let start = Bound::Included(range.x().to_byte_tuple());
738                let end = Bound::Excluded(range.y().to_byte_tuple());
739                let bounds = RecordsBounds::new(start, end);
740                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
741                chain_none(iter)
742            }
743            // split range: iter1 = start <= t < y, iter2 = x <= t <= end
744            Ordering::Greater => {
745                // iterator for entries from start to range.y
746                let end = Bound::Excluded(range.y().to_byte_tuple());
747                let bounds = RecordsBounds::from_start(&self.namespace, end);
748                let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
749
750                // iterator for entries from range.x to end
751                let start = Bound::Included(range.x().to_byte_tuple());
752                let bounds = RecordsBounds::to_end(&self.namespace, start);
753                let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
754
755                iter.chain(Some(iter2).into_iter().flatten())
756            }
757        };
758        Ok(iter)
759    }
760
761    fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
762        self.store.as_mut().modify(|tables| {
763            let entry = {
764                let (namespace, author, key) = id.as_byte_tuple();
765                let id = (namespace, key, author);
766                tables.records_by_key.remove(id)?;
767                let id = (namespace, author, key);
768                let value = tables.records.remove(id)?;
769                value.map(|value| into_entry(id, value.value()))
770            };
771            Ok(entry)
772        })
773    }
774
775    fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
776        let tables = self.store.as_mut().tables()?;
777        let bounds = RecordsBounds::namespace(self.namespace);
778        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
779        Ok(chain_none(iter))
780    }
781
782    fn prefixes_of(
783        &mut self,
784        id: &RecordIdentifier,
785    ) -> Result<Self::ParentIterator<'_>, Self::Error> {
786        let tables = self.store.as_mut().tables()?;
787        ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
788    }
789
790    fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
791        let tables = self.store.as_mut().tables()?;
792        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
793        let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
794        Ok(chain_none(iter))
795    }
796
797    fn remove_prefix_filtered(
798        &mut self,
799        id: &RecordIdentifier,
800        predicate: impl Fn(&Record) -> bool,
801    ) -> Result<usize> {
802        let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
803        self.store.as_mut().modify(|tables| {
804            let cb = |_k: RecordsId, v: RecordsValue| {
805                let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
806                let record = Record::new(hash.into(), len, timestamp);
807
808                predicate(&record)
809            };
810            let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
811            let count = iter.count();
812            Ok(count)
813        })
814    }
815}
816
817fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
818    iter: I,
819) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
820    iter.chain(None.into_iter().flatten())
821}
822
823/// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which
824/// is a prefix of the key passed to the iterator.
825#[derive(Debug)]
826pub struct ParentIterator {
827    inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
828}
829
830impl ParentIterator {
831    fn new(
832        tables: &Tables,
833        namespace: NamespaceId,
834        author: AuthorId,
835        key: Vec<u8>,
836    ) -> anyhow::Result<Self> {
837        let parents = parents(&tables.records, namespace, author, key.clone());
838        Ok(Self {
839            inner: parents.into_iter(),
840        })
841    }
842}
843
844fn parents(
845    table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
846    namespace: NamespaceId,
847    author: AuthorId,
848    mut key: Vec<u8>,
849) -> Vec<anyhow::Result<SignedEntry>> {
850    let mut res = Vec::new();
851
852    while !key.is_empty() {
853        let entry = get_exact(table, namespace, author, &key, false);
854        key.pop();
855        match entry {
856            Err(err) => res.push(Err(err)),
857            Ok(Some(entry)) => res.push(Ok(entry)),
858            Ok(None) => continue,
859        }
860    }
861    res.reverse();
862    res
863}
864
865impl Iterator for ParentIterator {
866    type Item = Result<SignedEntry>;
867
868    fn next(&mut self) -> Option<Self::Item> {
869        self.inner.next()
870    }
871}
872
873self_cell::self_cell!(
874    struct ContentHashesIteratorInner {
875        owner: RecordsTable,
876        #[covariant]
877        dependent: RecordsRange,
878    }
879);
880
881/// Iterator for all content hashes
882///
883/// Note that you might get duplicate hashes. Also, the iterator will keep
884/// a database snapshot open until it is dropped.
885///
886/// Also, this represents a snapshot of the database at the time of creation.
887/// It nees a copy of a redb::ReadOnlyTable to be self-contained.
888#[derive(derive_more::Debug)]
889pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner);
890
891impl ContentHashesIterator {
892    /// Create a new iterator over all content hashes.
893    pub fn all(owner: RecordsTable) -> anyhow::Result<Self> {
894        let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?;
895        Ok(Self(inner))
896    }
897}
898
899impl Iterator for ContentHashesIterator {
900    type Item = Result<Hash>;
901
902    fn next(&mut self) -> Option<Self::Item> {
903        let v = self.0.with_dependent_mut(|_, d| d.next())?;
904        Some(v.map(|e| e.content_hash()))
905    }
906}
907
908/// Iterator over the latest entry per author.
909#[derive(derive_more::Debug)]
910#[debug("LatestIterator")]
911pub struct LatestIterator<'a>(
912    redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
913);
914
915impl<'a> LatestIterator<'a> {
916    fn new(
917        latest_per_author: &'a impl ReadableTable<
918            LatestPerAuthorKey<'static>,
919            LatestPerAuthorValue<'static>,
920        >,
921        namespace: NamespaceId,
922    ) -> anyhow::Result<Self> {
923        let start = (namespace.as_bytes(), &[u8::MIN; 32]);
924        let end = (namespace.as_bytes(), &[u8::MAX; 32]);
925        let range = latest_per_author.range(start..=end)?;
926        Ok(Self(range))
927    }
928}
929
930impl<'a> Iterator for LatestIterator<'a> {
931    type Item = Result<(AuthorId, u64, Vec<u8>)>;
932
933    fn next(&mut self) -> Option<Self::Item> {
934        self.0.next_map(|key, value| {
935            let (_namespace, author) = key;
936            let (timestamp, key) = value;
937            (author.into(), timestamp, key.to_vec())
938        })
939    }
940}
941
942fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
943    let (namespace, author, key) = key;
944    let (timestamp, namespace_sig, author_sig, len, hash) = value;
945    let id = RecordIdentifier::new(namespace, author, key);
946    let record = Record::new(hash.into(), len, timestamp);
947    let entry = Entry::new(id, record);
948    let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
949    SignedEntry::new(entry_signature, entry)
950}
951
952#[cfg(test)]
953mod tests {
954    use super::tables::LATEST_PER_AUTHOR_TABLE;
955
956    use crate::ranger::Store as _;
957
958    use super::*;
959
960    #[test]
961    fn test_ranges() -> Result<()> {
962        let dbfile = tempfile::NamedTempFile::new()?;
963        let mut store = Store::persistent(dbfile.path())?;
964
965        let author = store.new_author(&mut rand::thread_rng())?;
966        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
967        let mut replica = store.new_replica(namespace.clone())?;
968
969        // test author prefix relation for all-255 keys
970        let key1 = vec![255, 255];
971        let key2 = vec![255, 255, 255];
972        replica.hash_and_insert(&key1, &author, b"v1")?;
973        replica.hash_and_insert(&key2, &author, b"v2")?;
974        let res = store
975            .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
976            .collect::<Result<Vec<_>>>()?;
977        assert_eq!(res.len(), 2);
978        assert_eq!(
979            res.into_iter()
980                .map(|entry| entry.key().to_vec())
981                .collect::<Vec<_>>(),
982            vec![key1, key2]
983        );
984        Ok(())
985    }
986
987    #[test]
988    fn test_basics() -> Result<()> {
989        let dbfile = tempfile::NamedTempFile::new()?;
990        let mut store = Store::persistent(dbfile.path())?;
991
992        let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
993        assert!(authors.is_empty());
994
995        let author = store.new_author(&mut rand::thread_rng())?;
996        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
997        let _replica = store.new_replica(namespace.clone())?;
998        store.close_replica(namespace.id());
999        let replica = store.load_replica_info(&namespace.id())?;
1000        assert_eq!(replica.capability.id(), namespace.id());
1001
1002        let author_back = store.get_author(&author.id())?.unwrap();
1003        assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1004
1005        let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1006        for i in 0..5 {
1007            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1008            let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1009            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1010            wrapper.entry_put(entry)?;
1011        }
1012
1013        // all
1014        let all: Vec<_> = wrapper.all()?.collect();
1015        assert_eq!(all.len(), 5);
1016
1017        // add a second version
1018        let mut ids = Vec::new();
1019        for i in 0..5 {
1020            let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1021            let entry = Entry::new(
1022                id.clone(),
1023                Record::current_from_data(format!("world-{i}-2")),
1024            );
1025            let entry = SignedEntry::from_entry(entry, &namespace, &author);
1026            wrapper.entry_put(entry)?;
1027            ids.push(id);
1028        }
1029
1030        // get all
1031        let entries = wrapper
1032            .store
1033            .get_many(namespace.id(), Query::all())?
1034            .collect::<Result<Vec<_>>>()?;
1035        assert_eq!(entries.len(), 5);
1036
1037        // get all prefix
1038        let entries = wrapper
1039            .store
1040            .get_many(namespace.id(), Query::key_prefix("hello-"))?
1041            .collect::<Result<Vec<_>>>()?;
1042        assert_eq!(entries.len(), 5);
1043
1044        // delete and get
1045        for id in ids {
1046            let res = wrapper.get(&id)?;
1047            assert!(res.is_some());
1048            let out = wrapper.entry_remove(&id)?.unwrap();
1049            assert_eq!(out.entry().id(), &id);
1050            let res = wrapper.get(&id)?;
1051            assert!(res.is_none());
1052        }
1053
1054        // get latest
1055        let entries = wrapper
1056            .store
1057            .get_many(namespace.id(), Query::all())?
1058            .collect::<Result<Vec<_>>>()?;
1059        assert_eq!(entries.len(), 0);
1060
1061        Ok(())
1062    }
1063
1064    fn copy_and_modify(
1065        source: &Path,
1066        modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1067    ) -> Result<tempfile::NamedTempFile> {
1068        let dbfile = tempfile::NamedTempFile::new()?;
1069        std::fs::copy(source, dbfile.path())?;
1070        let db = Database::create(dbfile.path())?;
1071        let write_tx = db.begin_write()?;
1072        modify(&write_tx)?;
1073        write_tx.commit()?;
1074        drop(db);
1075        Ok(dbfile)
1076    }
1077
1078    #[test]
1079    fn test_migration_001_populate_latest_table() -> Result<()> {
1080        let dbfile = tempfile::NamedTempFile::new()?;
1081        let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1082
1083        // create a store and add some data
1084        let expected = {
1085            let mut store = Store::persistent(dbfile.path())?;
1086            let author1 = store.new_author(&mut rand::thread_rng())?;
1087            let author2 = store.new_author(&mut rand::thread_rng())?;
1088            let mut replica = store.new_replica(namespace.clone())?;
1089            replica.hash_and_insert(b"k1", &author1, b"v1")?;
1090            replica.hash_and_insert(b"k2", &author2, b"v1")?;
1091            replica.hash_and_insert(b"k3", &author1, b"v1")?;
1092
1093            let expected = store
1094                .get_latest_for_each_author(namespace.id())?
1095                .collect::<Result<Vec<_>>>()?;
1096            // drop everything to clear file locks.
1097            store.close_replica(namespace.id());
1098            // flush the store to disk
1099            store.flush()?;
1100            drop(store);
1101            expected
1102        };
1103        assert_eq!(expected.len(), 2);
1104
1105        // create a copy of our db file with the latest table deleted.
1106        let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1107            tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1108            Ok(())
1109        })?;
1110
1111        // open the copied db file, which will run the migration.
1112        let mut store = Store::persistent(dbfile_before_migration.path())?;
1113        let actual = store
1114            .get_latest_for_each_author(namespace.id())?
1115            .collect::<Result<Vec<_>>>()?;
1116
1117        assert_eq!(expected, actual);
1118
1119        Ok(())
1120    }
1121
1122    #[test]
1123    fn test_migration_004_populate_by_key_index() -> Result<()> {
1124        let dbfile = tempfile::NamedTempFile::new()?;
1125
1126        let mut store = Store::persistent(dbfile.path())?;
1127
1128        // check that the new table is there, even if empty
1129        {
1130            let tables = store.tables()?;
1131            assert_eq!(tables.records_by_key.len()?, 0);
1132        }
1133
1134        // TODO: write test checking that the indexing is done correctly
1135
1136        Ok(())
1137    }
1138}