1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9 path::Path,
10};
11
12use anyhow::{anyhow, Result};
13use ed25519_dalek::{SignatureError, VerifyingKey};
14use iroh_blobs::Hash;
15use rand_core::CryptoRngCore;
16use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
17use tracing::warn;
18
19use super::{
20 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
21 Query,
22};
23use crate::{
24 actor::MAX_COMMIT_DELAY,
25 keys::Author,
26 ranger::{Fingerprint, Range, RangeEntry},
27 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
28 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
29 ReplicaInfo,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39pub use self::ranges::RecordsRange;
40use self::{
41 bounds::{ByKeyBounds, RecordsBounds},
42 query::QueryIterator,
43 ranges::RangeExt,
44 tables::{
45 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
46 RecordsValue, Tables, TransactionAndTables,
47 },
48};
49
50#[derive(Debug)]
52pub struct Store {
53 db: Database,
54 transaction: CurrentTransaction,
55 open_replicas: HashSet<NamespaceId>,
56 pubkeys: MemPublicKeyStore,
57}
58
59impl Drop for Store {
60 fn drop(&mut self) {
61 if let Err(err) = self.flush() {
62 warn!("failed to trigger final flush: {:?}", err);
63 }
64 }
65}
66
67impl AsRef<Store> for Store {
68 fn as_ref(&self) -> &Store {
69 self
70 }
71}
72
73impl AsMut<Store> for Store {
74 fn as_mut(&mut self) -> &mut Store {
75 self
76 }
77}
78
79#[derive(derive_more::Debug, Default)]
80enum CurrentTransaction {
81 #[default]
82 None,
83 Read(ReadOnlyTables),
84 Write(TransactionAndTables),
85}
86
87impl Store {
88 pub fn memory() -> Self {
90 Self::memory_impl().expect("failed to create memory store")
91 }
92
93 fn memory_impl() -> Result<Self> {
94 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
95 Self::new_impl(db)
96 }
97
98 pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
102 let db = match Database::create(&path) {
103 Ok(db) => db,
104 Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
105 Err(err) => return Err(err.into()),
106 };
107 Self::new_impl(db)
108 }
109
110 fn new_impl(db: redb::Database) -> Result<Self> {
111 let write_tx = db.begin_write()?;
113 let _ = Tables::new(&write_tx)?;
114 write_tx.commit()?;
115
116 migrations::run_migrations(&db)?;
118
119 Ok(Store {
120 db,
121 transaction: Default::default(),
122 open_replicas: Default::default(),
123 pubkeys: Default::default(),
124 })
125 }
126
127 pub fn flush(&mut self) -> Result<()> {
131 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
132 w.commit()?;
133 }
134 Ok(())
135 }
136
137 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
142 let guard = &mut self.transaction;
143 let tables = match std::mem::take(guard) {
144 CurrentTransaction::None => {
145 let tx = self.db.begin_read()?;
146 ReadOnlyTables::new(tx)?
147 }
148 CurrentTransaction::Write(w) => {
149 w.commit()?;
150 let tx = self.db.begin_read()?;
151 ReadOnlyTables::new(tx)?
152 }
153 CurrentTransaction::Read(tables) => tables,
154 };
155 *guard = CurrentTransaction::Read(tables);
156 match &*guard {
157 CurrentTransaction::Read(ref tables) => Ok(tables),
158 _ => unreachable!(),
159 }
160 }
161
162 pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
170 self.flush()?;
172 assert!(matches!(self.transaction, CurrentTransaction::None));
173 let tx = self.db.begin_read()?;
174 let tables = ReadOnlyTables::new(tx)?;
175 Ok(tables)
176 }
177
178 fn tables(&mut self) -> Result<&Tables> {
189 let guard = &mut self.transaction;
190 let tables = match std::mem::take(guard) {
191 CurrentTransaction::None => {
192 let tx = self.db.begin_write()?;
193 TransactionAndTables::new(tx)?
194 }
195 CurrentTransaction::Write(w) => {
196 if w.since.elapsed() > MAX_COMMIT_DELAY {
197 tracing::debug!("committing transaction because it's too old");
198 w.commit()?;
199 let tx = self.db.begin_write()?;
200 TransactionAndTables::new(tx)?
201 } else {
202 w
203 }
204 }
205 CurrentTransaction::Read(_) => {
206 let tx = self.db.begin_write()?;
207 TransactionAndTables::new(tx)?
208 }
209 };
210 *guard = CurrentTransaction::Write(tables);
211 match guard {
212 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
213 _ => unreachable!(),
214 }
215 }
216
217 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
226 let guard = &mut self.transaction;
227 let tables = match std::mem::take(guard) {
228 CurrentTransaction::None => {
229 let tx = self.db.begin_write()?;
230 TransactionAndTables::new(tx)?
231 }
232 CurrentTransaction::Write(w) => {
233 if w.since.elapsed() > MAX_COMMIT_DELAY {
234 tracing::debug!("committing transaction because it's too old");
235 w.commit()?;
236 let tx = self.db.begin_write()?;
237 TransactionAndTables::new(tx)?
238 } else {
239 w
240 }
241 }
242 CurrentTransaction::Read(_) => {
243 let tx = self.db.begin_write()?;
244 TransactionAndTables::new(tx)?
245 }
246 };
247 *guard = CurrentTransaction::Write(tables);
248 let res = match &mut *guard {
249 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
250 _ => unreachable!(),
251 };
252 Ok(res)
253 }
254}
255
256type PeersIter = std::vec::IntoIter<PeerIdBytes>;
257
258impl Store {
259 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
261 let id = namespace.id();
262 self.import_namespace(namespace.into())?;
263 self.open_replica(&id).map_err(Into::into)
264 }
265
266 pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
268 let author = Author::new(rng);
269 self.import_author(author.clone())?;
270 Ok(author)
271 }
272
273 pub fn has_news_for_us(
277 &mut self,
278 namespace: NamespaceId,
279 heads: &AuthorHeads,
280 ) -> Result<Option<NonZeroU64>> {
281 let our_heads = {
282 let latest = self.get_latest_for_each_author(namespace)?;
283 let mut heads = AuthorHeads::default();
284 for e in latest {
285 let (author, timestamp, _key) = e?;
286 heads.insert(author, timestamp);
287 }
288 heads
289 };
290 let has_news_for_us = heads.has_news_for(&our_heads);
291 Ok(has_news_for_us)
292 }
293
294 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
298 let info = self.load_replica_info(namespace_id)?;
299 let instance = StoreInstance::new(*namespace_id, self);
300 Ok(Replica::new(instance, Box::new(info)))
301 }
302
303 pub fn load_replica_info(
305 &mut self,
306 namespace_id: &NamespaceId,
307 ) -> Result<ReplicaInfo, OpenError> {
308 let tables = self.tables()?;
309 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
310 Ok(Some(db_value)) => {
311 let (raw_kind, raw_bytes) = db_value.value();
312 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
313 ReplicaInfo::new(namespace)
314 }
315 Ok(None) => return Err(OpenError::NotFound),
316 Err(err) => return Err(OpenError::Other(err.into())),
317 };
318 self.open_replicas.insert(info.capability.id());
319 Ok(info)
320 }
321
322 pub fn close_replica(&mut self, id: NamespaceId) {
324 self.open_replicas.remove(&id);
325 }
326
327 pub fn list_namespaces(
329 &mut self,
330 ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
331 let snapshot = self.snapshot()?;
332 let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
333 let iter = iter.map(|res| {
334 let capability = parse_capability(res?.1.value())?;
335 Ok((capability.id(), capability.kind()))
336 });
337 Ok(iter)
338 }
339
340 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
342 let tables = self.tables()?;
343 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
344 return Ok(None);
345 };
346 let author = Author::from_bytes(author.value());
347 Ok(Some(author))
348 }
349
350 pub fn import_author(&mut self, author: Author) -> Result<()> {
352 self.modify(|tables| {
353 tables
354 .authors
355 .insert(author.id().as_bytes(), &author.to_bytes())?;
356 Ok(())
357 })
358 }
359
360 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
362 self.modify(|tables| {
363 tables.authors.remove(author.as_bytes())?;
364 Ok(())
365 })
366 }
367
368 pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
370 let tables = self.snapshot()?;
371 let iter = tables
372 .authors
373 .range::<&'static [u8; 32]>(..)?
374 .map(|res| match res {
375 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
376 Err(err) => Err(err.into()),
377 });
378 Ok(iter)
379 }
380
381 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
383 self.modify(|tables| {
384 let outcome = {
385 let (capability, outcome) = {
386 let existing = tables.namespaces.get(capability.id().as_bytes())?;
387 if let Some(existing) = existing {
388 let mut existing = parse_capability(existing.value())?;
389 let outcome = if existing.merge(capability)? {
390 ImportNamespaceOutcome::Upgraded
391 } else {
392 ImportNamespaceOutcome::NoChange
393 };
394 (existing, outcome)
395 } else {
396 (capability, ImportNamespaceOutcome::Inserted)
397 }
398 };
399 let id = capability.id().to_bytes();
400 let (kind, bytes) = capability.raw();
401 tables.namespaces.insert(&id, (kind, &bytes))?;
402 outcome
403 };
404 Ok(outcome)
405 })
406 }
407
408 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
416 if self.open_replicas.contains(namespace) {
417 return Err(anyhow!("replica is not closed"));
418 }
419 self.modify(|tables| {
420 let bounds = RecordsBounds::namespace(*namespace);
421 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
422 let bounds = ByKeyBounds::namespace(*namespace);
423 let _ = tables
424 .records_by_key
425 .retain_in(bounds.as_ref(), |_k, _v| false);
426 tables.namespaces.remove(namespace.as_bytes())?;
427 tables.namespace_peers.remove_all(namespace.as_bytes())?;
428 tables.download_policy.remove(namespace.as_bytes())?;
429 Ok(())
430 })
431 }
432
433 pub fn get_many(
435 &mut self,
436 namespace: NamespaceId,
437 query: impl Into<Query>,
438 ) -> Result<QueryIterator> {
439 let tables = self.snapshot_owned()?;
440 QueryIterator::new(tables, namespace, query.into())
441 }
442
443 pub fn get_exact(
445 &mut self,
446 namespace: NamespaceId,
447 author: AuthorId,
448 key: impl AsRef<[u8]>,
449 include_empty: bool,
450 ) -> Result<Option<SignedEntry>> {
451 get_exact(
452 &self.tables()?.records,
453 namespace,
454 author,
455 key,
456 include_empty,
457 )
458 }
459
460 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
462 let tables = self.snapshot_owned()?;
463 ContentHashesIterator::all(&tables.records)
464 }
465
466 pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
468 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
469 }
470
471 pub fn register_useful_peer(
473 &mut self,
474 namespace: NamespaceId,
475 peer: crate::PeerIdBytes,
476 ) -> Result<()> {
477 let peer = &peer;
478 let namespace = namespace.as_bytes();
479 let nanos = std::time::UNIX_EPOCH
481 .elapsed()
482 .map(|duration| duration.as_nanos() as u64)?;
483 self.modify(|tables| {
484 anyhow::ensure!(
486 tables.namespaces.get(namespace)?.is_some(),
487 "document not created"
488 );
489
490 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
491
492 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
494 let (oldest_nanos, &oldest_peer) = guard.value();
495 (oldest_nanos, oldest_peer)
496 });
497 match maybe_oldest {
498 None => {
499 drop(namespace_peers);
502 tables.namespace_peers.insert(namespace, (nanos, peer))?;
503 }
504 Some((oldest_nanos, oldest_peer)) => {
505 let oldest_peer = &oldest_peer;
506
507 if oldest_peer == peer {
508 drop(namespace_peers);
511 tables
512 .namespace_peers
513 .remove(namespace, (oldest_nanos, oldest_peer))?;
514 tables.namespace_peers.insert(namespace, (nanos, peer))?;
515 } else {
516 let mut len = 1;
518 let mut prev_peer_nanos = None;
520
521 for result in namespace_peers {
522 len += 1;
523 let guard = result?;
524 let (peer_nanos, peer_bytes) = guard.value();
525 if prev_peer_nanos.is_none() && peer_bytes == peer {
526 prev_peer_nanos = Some(peer_nanos)
527 }
528 }
529
530 match prev_peer_nanos {
531 Some(prev_nanos) => {
532 tables
535 .namespace_peers
536 .remove(namespace, (prev_nanos, peer))?;
537 tables.namespace_peers.insert(namespace, (nanos, peer))?;
538 }
539 None => {
540 tables.namespace_peers.insert(namespace, (nanos, peer))?;
543 len += 1;
544 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
545 tables
546 .namespace_peers
547 .remove(namespace, (oldest_nanos, oldest_peer))?;
548 }
549 }
550 }
551 }
552 }
553 }
554 Ok(())
555 })
556 }
557
558 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
560 let tables = self.tables()?;
561 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
562 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
563 let (_nanos, &peer) = result?.value();
564 peers.push(peer);
565 }
566 if peers.is_empty() {
567 Ok(None)
568 } else {
569 Ok(Some(peers.into_iter()))
570 }
571 }
572
573 pub fn set_download_policy(
575 &mut self,
576 namespace: &NamespaceId,
577 policy: DownloadPolicy,
578 ) -> Result<()> {
579 self.modify(|tables| {
580 let namespace = namespace.as_bytes();
581
582 anyhow::ensure!(
584 tables.namespaces.get(&namespace)?.is_some(),
585 "document not created"
586 );
587
588 let value = postcard::to_stdvec(&policy)?;
589 tables.download_policy.insert(namespace, value.as_slice())?;
590 Ok(())
591 })
592 }
593
594 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
596 let tables = self.tables()?;
597 let value = tables.download_policy.get(namespace.as_bytes())?;
598 Ok(match value {
599 None => DownloadPolicy::default(),
600 Some(value) => postcard::from_bytes(value.value())?,
601 })
602 }
603}
604
605impl PublicKeyStore for Store {
606 fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
607 self.pubkeys.public_key(id)
608 }
609}
610
611fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
612 Capability::from_raw(raw_kind, raw_bytes)
613}
614
615fn get_exact(
616 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
617 namespace: NamespaceId,
618 author: AuthorId,
619 key: impl AsRef<[u8]>,
620 include_empty: bool,
621) -> Result<Option<SignedEntry>> {
622 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
623 let record = record_table.get(id)?;
624 Ok(record
625 .map(|r| into_entry(id, r.value()))
626 .filter(|entry| include_empty || !entry.is_empty()))
627}
628
629#[derive(Debug)]
631pub struct StoreInstance<'a> {
632 namespace: NamespaceId,
633 pub(crate) store: &'a mut Store,
634}
635
636impl<'a> StoreInstance<'a> {
637 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
638 StoreInstance { namespace, store }
639 }
640}
641
642impl PublicKeyStore for StoreInstance<'_> {
643 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
644 self.store.public_key(id)
645 }
646}
647
648impl super::DownloadPolicyStore for StoreInstance<'_> {
649 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
650 self.store.get_download_policy(namespace)
651 }
652}
653
654impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
655 type Error = anyhow::Error;
656 type RangeIterator<'x>
657 = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
658 where
659 'a: 'x;
660 type ParentIterator<'x>
661 = ParentIterator
662 where
663 'a: 'x;
664
665 fn get_first(&mut self) -> Result<RecordIdentifier> {
667 let tables = self.store.as_mut().tables()?;
668 let bounds = RecordsBounds::namespace(self.namespace);
670 let mut records = tables.records.range(bounds.as_ref())?;
671
672 let Some(record) = records.next() else {
673 return Ok(RecordIdentifier::default());
674 };
675 let (compound_key, _value) = record?;
676 let (namespace_id, author_id, key) = compound_key.value();
677 let id = RecordIdentifier::new(namespace_id, author_id, key);
678 Ok(id)
679 }
680
681 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
682 self.store
683 .as_mut()
684 .get_exact(id.namespace(), id.author(), id.key(), true)
685 }
686
687 fn len(&mut self) -> Result<usize> {
688 let tables = self.store.as_mut().tables()?;
689 let bounds = RecordsBounds::namespace(self.namespace);
690 let records = tables.records.range(bounds.as_ref())?;
691 Ok(records.count())
692 }
693
694 fn is_empty(&mut self) -> Result<bool> {
695 let tables = self.store.as_mut().tables()?;
696 Ok(tables.records.is_empty()?)
697 }
698
699 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
700 let elements = self.get_range(range.clone())?;
702
703 let mut fp = Fingerprint::empty();
704 for el in elements {
705 let el = el?;
706 fp ^= el.as_fingerprint();
707 }
708
709 Ok(fp)
710 }
711
712 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
713 let id = e.id();
714 self.store.as_mut().modify(|tables| {
715 let key = (
717 &id.namespace().to_bytes(),
718 &id.author().to_bytes(),
719 id.key(),
720 );
721 let hash = e.content_hash(); let value = (
723 e.timestamp(),
724 &e.signature().namespace().to_bytes(),
725 &e.signature().author().to_bytes(),
726 e.content_len(),
727 hash.as_bytes(),
728 );
729 tables.records.insert(key, value)?;
730
731 let key = (
733 &id.namespace().to_bytes(),
734 id.key(),
735 &id.author().to_bytes(),
736 );
737 tables.records_by_key.insert(key, ())?;
738
739 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
741 let value = (e.timestamp(), e.id().key());
742 tables.latest_per_author.insert(key, value)?;
743 Ok(())
744 })
745 }
746
747 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
748 let tables = self.store.as_mut().tables()?;
749 let iter = match range.x().cmp(range.y()) {
750 Ordering::Equal => {
752 let bounds = RecordsBounds::namespace(self.namespace);
754 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
755 chain_none(iter)
756 }
757 Ordering::Less => {
759 let start = Bound::Included(range.x().to_byte_tuple());
761 let end = Bound::Excluded(range.y().to_byte_tuple());
762 let bounds = RecordsBounds::new(start, end);
763 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
764 chain_none(iter)
765 }
766 Ordering::Greater => {
768 let end = Bound::Excluded(range.y().to_byte_tuple());
770 let bounds = RecordsBounds::from_start(&self.namespace, end);
771 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
772
773 let start = Bound::Included(range.x().to_byte_tuple());
775 let bounds = RecordsBounds::to_end(&self.namespace, start);
776 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
777
778 iter.chain(Some(iter2).into_iter().flatten())
779 }
780 };
781 Ok(iter)
782 }
783
784 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
785 self.store.as_mut().modify(|tables| {
786 let entry = {
787 let (namespace, author, key) = id.as_byte_tuple();
788 let id = (namespace, key, author);
789 tables.records_by_key.remove(id)?;
790 let id = (namespace, author, key);
791 let value = tables.records.remove(id)?;
792 value.map(|value| into_entry(id, value.value()))
793 };
794 Ok(entry)
795 })
796 }
797
798 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
799 let tables = self.store.as_mut().tables()?;
800 let bounds = RecordsBounds::namespace(self.namespace);
801 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
802 Ok(chain_none(iter))
803 }
804
805 fn prefixes_of(
806 &mut self,
807 id: &RecordIdentifier,
808 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
809 let tables = self.store.as_mut().tables()?;
810 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
811 }
812
813 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
814 let tables = self.store.as_mut().tables()?;
815 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
816 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
817 Ok(chain_none(iter))
818 }
819
820 fn remove_prefix_filtered(
821 &mut self,
822 id: &RecordIdentifier,
823 predicate: impl Fn(&Record) -> bool,
824 ) -> Result<usize> {
825 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
826 self.store.as_mut().modify(|tables| {
827 let cb = |_k: RecordsId, v: RecordsValue| {
828 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
829 let record = Record::new(hash.into(), len, timestamp);
830
831 predicate(&record)
832 };
833 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
834 let count = iter.count();
835 Ok(count)
836 })
837 }
838}
839
840fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
841 iter: I,
842) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
843 iter.chain(None.into_iter().flatten())
844}
845
846#[derive(Debug)]
849pub struct ParentIterator {
850 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
851}
852
853impl ParentIterator {
854 fn new(
855 tables: &Tables,
856 namespace: NamespaceId,
857 author: AuthorId,
858 key: Vec<u8>,
859 ) -> anyhow::Result<Self> {
860 let parents = parents(&tables.records, namespace, author, key.clone());
861 Ok(Self {
862 inner: parents.into_iter(),
863 })
864 }
865}
866
867fn parents(
868 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
869 namespace: NamespaceId,
870 author: AuthorId,
871 mut key: Vec<u8>,
872) -> Vec<anyhow::Result<SignedEntry>> {
873 let mut res = Vec::new();
874
875 while !key.is_empty() {
876 let entry = get_exact(table, namespace, author, &key, false);
877 key.pop();
878 match entry {
879 Err(err) => res.push(Err(err)),
880 Ok(Some(entry)) => res.push(Ok(entry)),
881 Ok(None) => continue,
882 }
883 }
884 res.reverse();
885 res
886}
887
888impl Iterator for ParentIterator {
889 type Item = Result<SignedEntry>;
890
891 fn next(&mut self) -> Option<Self::Item> {
892 self.inner.next()
893 }
894}
895
896#[derive(derive_more::Debug)]
904pub struct ContentHashesIterator {
905 #[debug(skip)]
906 range: RecordsRange<'static>,
907}
908
909impl ContentHashesIterator {
910 pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
912 let range = RecordsRange::all_static(table)?;
913 Ok(Self { range })
914 }
915}
916
917impl Iterator for ContentHashesIterator {
918 type Item = Result<Hash>;
919
920 fn next(&mut self) -> Option<Self::Item> {
921 let v = self.range.next()?;
922 Some(v.map(|e| e.content_hash()))
923 }
924}
925
926#[derive(derive_more::Debug)]
928#[debug("LatestIterator")]
929pub struct LatestIterator<'a>(
930 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
931);
932
933impl<'a> LatestIterator<'a> {
934 fn new(
935 latest_per_author: &'a impl ReadableTable<
936 LatestPerAuthorKey<'static>,
937 LatestPerAuthorValue<'static>,
938 >,
939 namespace: NamespaceId,
940 ) -> anyhow::Result<Self> {
941 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
942 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
943 let range = latest_per_author.range(start..=end)?;
944 Ok(Self(range))
945 }
946}
947
948impl Iterator for LatestIterator<'_> {
949 type Item = Result<(AuthorId, u64, Vec<u8>)>;
950
951 fn next(&mut self) -> Option<Self::Item> {
952 self.0.next_map(|key, value| {
953 let (_namespace, author) = key;
954 let (timestamp, key) = value;
955 (author.into(), timestamp, key.to_vec())
956 })
957 }
958}
959
960fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
961 let (namespace, author, key) = key;
962 let (timestamp, namespace_sig, author_sig, len, hash) = value;
963 let id = RecordIdentifier::new(namespace, author, key);
964 let record = Record::new(hash.into(), len, timestamp);
965 let entry = Entry::new(id, record);
966 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
967 SignedEntry::new(entry_signature, entry)
968}
969
970#[cfg(test)]
971mod tests {
972 use super::{tables::LATEST_PER_AUTHOR_TABLE, *};
973 use crate::ranger::Store as _;
974
975 #[test]
976 fn test_ranges() -> Result<()> {
977 let dbfile = tempfile::NamedTempFile::new()?;
978 let mut store = Store::persistent(dbfile.path())?;
979
980 let author = store.new_author(&mut rand::thread_rng())?;
981 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
982 let mut replica = store.new_replica(namespace.clone())?;
983
984 let key1 = vec![255, 255];
986 let key2 = vec![255, 255, 255];
987 replica.hash_and_insert(&key1, &author, b"v1")?;
988 replica.hash_and_insert(&key2, &author, b"v2")?;
989 let res = store
990 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
991 .collect::<Result<Vec<_>>>()?;
992 assert_eq!(res.len(), 2);
993 assert_eq!(
994 res.into_iter()
995 .map(|entry| entry.key().to_vec())
996 .collect::<Vec<_>>(),
997 vec![key1, key2]
998 );
999 Ok(())
1000 }
1001
1002 #[test]
1003 fn test_basics() -> Result<()> {
1004 let dbfile = tempfile::NamedTempFile::new()?;
1005 let mut store = Store::persistent(dbfile.path())?;
1006
1007 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1008 assert!(authors.is_empty());
1009
1010 let author = store.new_author(&mut rand::thread_rng())?;
1011 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1012 let _replica = store.new_replica(namespace.clone())?;
1013 store.close_replica(namespace.id());
1014 let replica = store.load_replica_info(&namespace.id())?;
1015 assert_eq!(replica.capability.id(), namespace.id());
1016
1017 let author_back = store.get_author(&author.id())?.unwrap();
1018 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1019
1020 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1021 for i in 0..5 {
1022 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1023 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1024 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1025 wrapper.entry_put(entry)?;
1026 }
1027
1028 let all: Vec<_> = wrapper.all()?.collect();
1030 assert_eq!(all.len(), 5);
1031
1032 let mut ids = Vec::new();
1034 for i in 0..5 {
1035 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1036 let entry = Entry::new(
1037 id.clone(),
1038 Record::current_from_data(format!("world-{i}-2")),
1039 );
1040 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1041 wrapper.entry_put(entry)?;
1042 ids.push(id);
1043 }
1044
1045 let entries = wrapper
1047 .store
1048 .get_many(namespace.id(), Query::all())?
1049 .collect::<Result<Vec<_>>>()?;
1050 assert_eq!(entries.len(), 5);
1051
1052 let entries = wrapper
1054 .store
1055 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1056 .collect::<Result<Vec<_>>>()?;
1057 assert_eq!(entries.len(), 5);
1058
1059 for id in ids {
1061 let res = wrapper.get(&id)?;
1062 assert!(res.is_some());
1063 let out = wrapper.entry_remove(&id)?.unwrap();
1064 assert_eq!(out.entry().id(), &id);
1065 let res = wrapper.get(&id)?;
1066 assert!(res.is_none());
1067 }
1068
1069 let entries = wrapper
1071 .store
1072 .get_many(namespace.id(), Query::all())?
1073 .collect::<Result<Vec<_>>>()?;
1074 assert_eq!(entries.len(), 0);
1075
1076 Ok(())
1077 }
1078
1079 fn copy_and_modify(
1080 source: &Path,
1081 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1082 ) -> Result<tempfile::NamedTempFile> {
1083 let dbfile = tempfile::NamedTempFile::new()?;
1084 std::fs::copy(source, dbfile.path())?;
1085 let db = Database::create(dbfile.path())?;
1086 let write_tx = db.begin_write()?;
1087 modify(&write_tx)?;
1088 write_tx.commit()?;
1089 drop(db);
1090 Ok(dbfile)
1091 }
1092
1093 #[test]
1094 fn test_migration_001_populate_latest_table() -> Result<()> {
1095 let dbfile = tempfile::NamedTempFile::new()?;
1096 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1097
1098 let expected = {
1100 let mut store = Store::persistent(dbfile.path())?;
1101 let author1 = store.new_author(&mut rand::thread_rng())?;
1102 let author2 = store.new_author(&mut rand::thread_rng())?;
1103 let mut replica = store.new_replica(namespace.clone())?;
1104 replica.hash_and_insert(b"k1", &author1, b"v1")?;
1105 replica.hash_and_insert(b"k2", &author2, b"v1")?;
1106 replica.hash_and_insert(b"k3", &author1, b"v1")?;
1107
1108 let expected = store
1109 .get_latest_for_each_author(namespace.id())?
1110 .collect::<Result<Vec<_>>>()?;
1111 store.close_replica(namespace.id());
1113 store.flush()?;
1115 drop(store);
1116 expected
1117 };
1118 assert_eq!(expected.len(), 2);
1119
1120 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1122 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1123 Ok(())
1124 })?;
1125
1126 let mut store = Store::persistent(dbfile_before_migration.path())?;
1128 let actual = store
1129 .get_latest_for_each_author(namespace.id())?
1130 .collect::<Result<Vec<_>>>()?;
1131
1132 assert_eq!(expected, actual);
1133 Ok(())
1134 }
1135
1136 #[test]
1137 fn test_migration_004_populate_by_key_index() -> Result<()> {
1138 let dbfile = tempfile::NamedTempFile::new()?;
1139
1140 let mut store = Store::persistent(dbfile.path())?;
1141
1142 {
1144 let tables = store.tables()?;
1145 assert_eq!(tables.records_by_key.len()?, 0);
1146 }
1147
1148 Ok(())
1150 }
1151}