1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9};
10
11use anyhow::{anyhow, Result};
12use iroh::{KeyParsingError, PublicKey};
13use iroh_blobs::Hash;
14use n0_future::time::SystemTime;
15use rand::CryptoRng;
16use redb::{Database, ReadableDatabase, ReadableMultimapTable, ReadableTable};
17#[cfg(all(feature = "fs-store", feature = "redb-v2-migration"))]
18use tracing::info;
19use tracing::warn;
20
21use super::{
22 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
23 Query,
24};
25use crate::{
26 actor::MAX_COMMIT_DELAY,
27 keys::Author,
28 ranger::{Fingerprint, Range, RangeEntry},
29 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
30 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
31 ReplicaInfo,
32};
33
34mod bounds;
35#[cfg(all(feature = "fs-store", feature = "redb-v2-migration"))]
36mod migrate_redb_v2_tuples;
37mod migrations;
38mod query;
39mod ranges;
40pub(crate) mod tables;
41
42pub use self::ranges::RecordsRange;
43use self::{
44 bounds::{ByKeyBounds, RecordsBounds},
45 query::QueryIterator,
46 ranges::RangeExt,
47 tables::{
48 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsTable,
49 RecordsValue, Tables, TransactionAndTables,
50 },
51};
52
53#[derive(Debug)]
55pub struct Store {
56 db: Database,
57 transaction: CurrentTransaction,
58 open_replicas: HashSet<NamespaceId>,
59 pubkeys: MemPublicKeyStore,
60}
61
62impl Drop for Store {
63 fn drop(&mut self) {
64 if let Err(err) = self.flush() {
65 warn!("failed to trigger final flush: {:?}", err);
66 }
67 }
68}
69
70impl AsRef<Store> for Store {
71 fn as_ref(&self) -> &Store {
72 self
73 }
74}
75
76impl AsMut<Store> for Store {
77 fn as_mut(&mut self) -> &mut Store {
78 self
79 }
80}
81
82#[derive(derive_more::Debug, Default)]
83#[allow(clippy::large_enum_variant)]
84enum CurrentTransaction {
85 #[default]
86 None,
87 Read(ReadOnlyTables),
88 Write(TransactionAndTables),
89}
90
91#[cfg(feature = "fs-store")]
92fn open_database(path: &std::path::Path) -> Result<Database> {
93 match Database::create(path) {
94 Ok(db) => Ok(db),
95 Err(redb::DatabaseError::UpgradeRequired(v)) => Err(anyhow!(
96 "Opening the database failed: Upgrading from redb {v} no longer supported. Use an older redb version first."
97 )),
98 Err(err) => Err(err.into()),
99 }
100}
101
102#[cfg(feature = "fs-store")]
103fn is_redb_v2_tuple_mismatch(err: &anyhow::Error) -> bool {
104 err.chain().any(|e| {
105 matches!(
106 e.downcast_ref::<redb::TableError>(),
107 Some(redb::TableError::TableTypeMismatch { .. })
108 )
109 })
110}
111
112impl Store {
113 pub fn memory() -> Self {
115 Self::memory_impl().expect("failed to create memory store")
116 }
117
118 fn memory_impl() -> Result<Self> {
119 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
120 Self::new_impl(db)
121 }
122
123 #[cfg(feature = "fs-store")]
127 pub fn persistent(path: impl AsRef<std::path::Path>) -> Result<Self> {
128 let path = path.as_ref();
129 let db = open_database(path)?;
130 match Self::new_impl(db) {
131 Ok(store) => Ok(store),
132 Err(err) if is_redb_v2_tuple_mismatch(&err) => {
133 #[cfg(feature = "redb-v2-migration")]
134 {
135 info!("redb 2.x tuple format detected, running migration");
136 migrate_redb_v2_tuples::run(path)?;
137 Self::new_impl(open_database(path)?)
138 }
139 #[cfg(not(feature = "redb-v2-migration"))]
140 {
141 let _ = err;
142 Err(anyhow!(
143 "Opening the database failed: this store was written by iroh-docs 0.94..=0.98 (redb 2.x) and needs migration. Enable the `redb-v2-migration` feature on iroh-docs and re-open to migrate."
144 ))
145 }
146 }
147 Err(err) => Err(err),
148 }
149 }
150
151 fn new_impl(db: redb::Database) -> Result<Self> {
152 let write_tx = db.begin_write()?;
154 let _ = Tables::new(&write_tx)?;
155 write_tx.commit()?;
156
157 migrations::run_migrations(&db)?;
159
160 Ok(Store {
161 db,
162 transaction: Default::default(),
163 open_replicas: Default::default(),
164 pubkeys: Default::default(),
165 })
166 }
167
168 pub fn flush(&mut self) -> Result<()> {
172 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
173 w.commit()?;
174 }
175 Ok(())
176 }
177
178 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
183 let guard = &mut self.transaction;
184 let tables = match std::mem::take(guard) {
185 CurrentTransaction::None => {
186 let tx = self.db.begin_read()?;
187 ReadOnlyTables::new(tx)?
188 }
189 CurrentTransaction::Write(w) => {
190 w.commit()?;
191 let tx = self.db.begin_read()?;
192 ReadOnlyTables::new(tx)?
193 }
194 CurrentTransaction::Read(tables) => tables,
195 };
196 *guard = CurrentTransaction::Read(tables);
197 match &*guard {
198 CurrentTransaction::Read(ref tables) => Ok(tables),
199 _ => unreachable!(),
200 }
201 }
202
203 pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
211 self.flush()?;
213 assert!(matches!(self.transaction, CurrentTransaction::None));
214 let tx = self.db.begin_read()?;
215 let tables = ReadOnlyTables::new(tx)?;
216 Ok(tables)
217 }
218
219 fn tables(&mut self) -> Result<&Tables<'_>> {
230 let guard = &mut self.transaction;
231 let tables = match std::mem::take(guard) {
232 CurrentTransaction::None => {
233 let tx = self.db.begin_write()?;
234 TransactionAndTables::new(tx)?
235 }
236 CurrentTransaction::Write(w) => {
237 if w.since.elapsed() > MAX_COMMIT_DELAY {
238 tracing::debug!("committing transaction because it's too old");
239 w.commit()?;
240 let tx = self.db.begin_write()?;
241 TransactionAndTables::new(tx)?
242 } else {
243 w
244 }
245 }
246 CurrentTransaction::Read(_) => {
247 let tx = self.db.begin_write()?;
248 TransactionAndTables::new(tx)?
249 }
250 };
251 *guard = CurrentTransaction::Write(tables);
252 match guard {
253 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
254 _ => unreachable!(),
255 }
256 }
257
258 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
267 let guard = &mut self.transaction;
268 let tables = match std::mem::take(guard) {
269 CurrentTransaction::None => {
270 let tx = self.db.begin_write()?;
271 TransactionAndTables::new(tx)?
272 }
273 CurrentTransaction::Write(w) => {
274 if w.since.elapsed() > MAX_COMMIT_DELAY {
275 tracing::debug!("committing transaction because it's too old");
276 w.commit()?;
277 let tx = self.db.begin_write()?;
278 TransactionAndTables::new(tx)?
279 } else {
280 w
281 }
282 }
283 CurrentTransaction::Read(_) => {
284 let tx = self.db.begin_write()?;
285 TransactionAndTables::new(tx)?
286 }
287 };
288 *guard = CurrentTransaction::Write(tables);
289 let res = match &mut *guard {
290 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
291 _ => unreachable!(),
292 };
293 Ok(res)
294 }
295}
296
297type PeersIter = std::vec::IntoIter<PeerIdBytes>;
298
299impl Store {
300 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
302 let id = namespace.id();
303 self.import_namespace(namespace.into())?;
304 self.open_replica(&id).map_err(Into::into)
305 }
306
307 pub fn new_author<R: CryptoRng>(&mut self, rng: &mut R) -> Result<Author> {
309 let author = Author::new(rng);
310 self.import_author(author.clone())?;
311 Ok(author)
312 }
313
314 pub fn has_news_for_us(
318 &mut self,
319 namespace: NamespaceId,
320 heads: &AuthorHeads,
321 ) -> Result<Option<NonZeroU64>> {
322 let our_heads = {
323 let latest = self.get_latest_for_each_author(namespace)?;
324 let mut heads = AuthorHeads::default();
325 for e in latest {
326 let (author, timestamp, _key) = e?;
327 heads.insert(author, timestamp);
328 }
329 heads
330 };
331 let has_news_for_us = heads.has_news_for(&our_heads);
332 Ok(has_news_for_us)
333 }
334
335 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
339 let info = self.load_replica_info(namespace_id)?;
340 let instance = StoreInstance::new(*namespace_id, self);
341 Ok(Replica::new(instance, Box::new(info)))
342 }
343
344 pub fn load_replica_info(
346 &mut self,
347 namespace_id: &NamespaceId,
348 ) -> Result<ReplicaInfo, OpenError> {
349 let tables = self.tables()?;
350 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
351 Ok(Some(db_value)) => {
352 let (raw_kind, raw_bytes) = db_value.value();
353 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
354 ReplicaInfo::new(namespace)
355 }
356 Ok(None) => return Err(OpenError::NotFound),
357 Err(err) => return Err(OpenError::Other(err.into())),
358 };
359 self.open_replicas.insert(info.capability.id());
360 Ok(info)
361 }
362
363 pub fn close_replica(&mut self, id: NamespaceId) {
365 self.open_replicas.remove(&id);
366 }
367
368 pub fn list_namespaces(
370 &mut self,
371 ) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
372 let snapshot = self.snapshot()?;
373 let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
374 let iter = iter.map(|res| {
375 let capability = parse_capability(res?.1.value())?;
376 Ok((capability.id(), capability.kind()))
377 });
378 Ok(iter)
379 }
380
381 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
383 let tables = self.tables()?;
384 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
385 return Ok(None);
386 };
387 let author = Author::from_bytes(author.value());
388 Ok(Some(author))
389 }
390
391 pub fn import_author(&mut self, author: Author) -> Result<()> {
393 self.modify(|tables| {
394 tables
395 .authors
396 .insert(author.id().as_bytes(), &author.to_bytes())?;
397 Ok(())
398 })
399 }
400
401 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
403 self.modify(|tables| {
404 tables.authors.remove(author.as_bytes())?;
405 Ok(())
406 })
407 }
408
409 pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
411 let tables = self.snapshot()?;
412 let iter = tables
413 .authors
414 .range::<&'static [u8; 32]>(..)?
415 .map(|res| match res {
416 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
417 Err(err) => Err(err.into()),
418 });
419 Ok(iter)
420 }
421
422 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
424 self.modify(|tables| {
425 let outcome = {
426 let (capability, outcome) = {
427 let existing = tables.namespaces.get(capability.id().as_bytes())?;
428 if let Some(existing) = existing {
429 let mut existing = parse_capability(existing.value())?;
430 let outcome = if existing.merge(capability)? {
431 ImportNamespaceOutcome::Upgraded
432 } else {
433 ImportNamespaceOutcome::NoChange
434 };
435 (existing, outcome)
436 } else {
437 (capability, ImportNamespaceOutcome::Inserted)
438 }
439 };
440 let id = capability.id().to_bytes();
441 let (kind, bytes) = capability.raw();
442 tables.namespaces.insert(&id, (kind, &bytes))?;
443 outcome
444 };
445 Ok(outcome)
446 })
447 }
448
449 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
457 if self.open_replicas.contains(namespace) {
458 return Err(anyhow!("replica is not closed"));
459 }
460 self.modify(|tables| {
461 let bounds = RecordsBounds::namespace(*namespace);
462 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
463 let bounds = ByKeyBounds::namespace(*namespace);
464 let _ = tables
465 .records_by_key
466 .retain_in(bounds.as_ref(), |_k, _v| false);
467 tables.namespaces.remove(namespace.as_bytes())?;
468 tables.namespace_peers.remove_all(namespace.as_bytes())?;
469 tables.download_policy.remove(namespace.as_bytes())?;
470 Ok(())
471 })
472 }
473
474 pub fn get_many(
476 &mut self,
477 namespace: NamespaceId,
478 query: impl Into<Query>,
479 ) -> Result<QueryIterator> {
480 let tables = self.snapshot_owned()?;
481 QueryIterator::new(tables, namespace, query.into())
482 }
483
484 pub fn get_exact(
486 &mut self,
487 namespace: NamespaceId,
488 author: AuthorId,
489 key: impl AsRef<[u8]>,
490 include_empty: bool,
491 ) -> Result<Option<SignedEntry>> {
492 get_exact(
493 &self.tables()?.records,
494 namespace,
495 author,
496 key,
497 include_empty,
498 )
499 }
500
501 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
503 let tables = self.snapshot_owned()?;
504 ContentHashesIterator::all(&tables.records)
505 }
506
507 pub fn get_latest_for_each_author(
509 &mut self,
510 namespace: NamespaceId,
511 ) -> Result<LatestIterator<'_>> {
512 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
513 }
514
515 pub fn register_useful_peer(
517 &mut self,
518 namespace: NamespaceId,
519 peer: crate::PeerIdBytes,
520 ) -> Result<()> {
521 let peer = &peer;
522 let namespace = namespace.as_bytes();
523 let nanos = SystemTime::UNIX_EPOCH
525 .elapsed()
526 .map(|duration| duration.as_nanos() as u64)?;
527 self.modify(|tables| {
528 anyhow::ensure!(
530 tables.namespaces.get(namespace)?.is_some(),
531 "document not created"
532 );
533
534 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
535
536 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
538 let (oldest_nanos, &oldest_peer) = guard.value();
539 (oldest_nanos, oldest_peer)
540 });
541 match maybe_oldest {
542 None => {
543 drop(namespace_peers);
546 tables.namespace_peers.insert(namespace, (nanos, peer))?;
547 }
548 Some((oldest_nanos, oldest_peer)) => {
549 let oldest_peer = &oldest_peer;
550
551 if oldest_peer == peer {
552 drop(namespace_peers);
555 tables
556 .namespace_peers
557 .remove(namespace, (oldest_nanos, oldest_peer))?;
558 tables.namespace_peers.insert(namespace, (nanos, peer))?;
559 } else {
560 let mut len = 1;
562 let mut prev_peer_nanos = None;
564
565 for result in namespace_peers {
566 len += 1;
567 let guard = result?;
568 let (peer_nanos, peer_bytes) = guard.value();
569 if prev_peer_nanos.is_none() && peer_bytes == peer {
570 prev_peer_nanos = Some(peer_nanos)
571 }
572 }
573
574 match prev_peer_nanos {
575 Some(prev_nanos) => {
576 tables
579 .namespace_peers
580 .remove(namespace, (prev_nanos, peer))?;
581 tables.namespace_peers.insert(namespace, (nanos, peer))?;
582 }
583 None => {
584 tables.namespace_peers.insert(namespace, (nanos, peer))?;
587 len += 1;
588 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
589 tables
590 .namespace_peers
591 .remove(namespace, (oldest_nanos, oldest_peer))?;
592 }
593 }
594 }
595 }
596 }
597 }
598 Ok(())
599 })
600 }
601
602 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
604 let tables = self.tables()?;
605 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
606 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
607 let (_nanos, &peer) = result?.value();
608 peers.push(peer);
609 }
610 if peers.is_empty() {
611 Ok(None)
612 } else {
613 Ok(Some(peers.into_iter()))
614 }
615 }
616
617 pub fn set_download_policy(
619 &mut self,
620 namespace: &NamespaceId,
621 policy: DownloadPolicy,
622 ) -> Result<()> {
623 self.modify(|tables| {
624 let namespace = namespace.as_bytes();
625
626 anyhow::ensure!(
628 tables.namespaces.get(&namespace)?.is_some(),
629 "document not created"
630 );
631
632 let value = postcard::to_stdvec(&policy)?;
633 tables.download_policy.insert(namespace, value.as_slice())?;
634 Ok(())
635 })
636 }
637
638 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
640 let tables = self.tables()?;
641 let value = tables.download_policy.get(namespace.as_bytes())?;
642 Ok(match value {
643 None => DownloadPolicy::default(),
644 Some(value) => postcard::from_bytes(value.value())?,
645 })
646 }
647}
648
649impl PublicKeyStore for Store {
650 fn public_key(&self, id: &[u8; 32]) -> Result<PublicKey, KeyParsingError> {
651 self.pubkeys.public_key(id)
652 }
653}
654
655fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
656 Capability::from_raw(raw_kind, raw_bytes)
657}
658
659fn get_exact(
660 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
661 namespace: NamespaceId,
662 author: AuthorId,
663 key: impl AsRef<[u8]>,
664 include_empty: bool,
665) -> Result<Option<SignedEntry>> {
666 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
667 let record = record_table.get(id)?;
668 Ok(record
669 .map(|r| into_entry(id, r.value()))
670 .filter(|entry| include_empty || !entry.is_empty()))
671}
672
673#[derive(Debug)]
675pub struct StoreInstance<'a> {
676 namespace: NamespaceId,
677 pub(crate) store: &'a mut Store,
678}
679
680impl<'a> StoreInstance<'a> {
681 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
682 StoreInstance { namespace, store }
683 }
684}
685
686impl PublicKeyStore for StoreInstance<'_> {
687 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<PublicKey, KeyParsingError> {
688 self.store.public_key(id)
689 }
690}
691
692impl super::DownloadPolicyStore for StoreInstance<'_> {
693 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
694 self.store.get_download_policy(namespace)
695 }
696}
697
698impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
699 type Error = anyhow::Error;
700 type RangeIterator<'x>
701 = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
702 where
703 'a: 'x;
704 type ParentIterator<'x>
705 = ParentIterator
706 where
707 'a: 'x;
708
709 fn get_first(&mut self) -> Result<RecordIdentifier> {
711 let tables = self.store.as_mut().tables()?;
712 let bounds = RecordsBounds::namespace(self.namespace);
714 let mut records = tables.records.range(bounds.as_ref())?;
715
716 let Some(record) = records.next() else {
717 return Ok(RecordIdentifier::default());
718 };
719 let (compound_key, _value) = record?;
720 let (namespace_id, author_id, key) = compound_key.value();
721 let id = RecordIdentifier::new(namespace_id, author_id, key);
722 Ok(id)
723 }
724
725 #[cfg(test)]
726 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
727 self.store
728 .as_mut()
729 .get_exact(id.namespace(), id.author(), id.key(), true)
730 }
731
732 #[cfg(test)]
733 fn len(&mut self) -> Result<usize> {
734 let tables = self.store.as_mut().tables()?;
735 let bounds = RecordsBounds::namespace(self.namespace);
736 let records = tables.records.range(bounds.as_ref())?;
737 Ok(records.count())
738 }
739
740 #[cfg(test)]
741 fn is_empty(&mut self) -> Result<bool> {
742 use redb::ReadableTableMetadata;
743 let tables = self.store.as_mut().tables()?;
744 Ok(tables.records.is_empty()?)
745 }
746
747 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
748 let elements = self.get_range(range.clone())?;
750
751 let mut fp = Fingerprint::empty();
752 for el in elements {
753 let el = el?;
754 fp ^= el.as_fingerprint();
755 }
756
757 Ok(fp)
758 }
759
760 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
761 let id = e.id();
762 self.store.as_mut().modify(|tables| {
763 let key = (
765 &id.namespace().to_bytes(),
766 &id.author().to_bytes(),
767 id.key(),
768 );
769 let hash = e.content_hash(); let value = (
771 e.timestamp(),
772 &e.signature().namespace().to_bytes(),
773 &e.signature().author().to_bytes(),
774 e.content_len(),
775 hash.as_bytes(),
776 );
777 tables.records.insert(key, value)?;
778
779 let key = (
781 &id.namespace().to_bytes(),
782 id.key(),
783 &id.author().to_bytes(),
784 );
785 tables.records_by_key.insert(key, ())?;
786
787 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
789 let value = (e.timestamp(), e.id().key());
790 tables.latest_per_author.insert(key, value)?;
791 Ok(())
792 })
793 }
794
795 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
796 let tables = self.store.as_mut().tables()?;
797 let iter = match range.x().cmp(range.y()) {
798 Ordering::Equal => {
800 let bounds = RecordsBounds::namespace(self.namespace);
802 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
803 chain_none(iter)
804 }
805 Ordering::Less => {
807 let start = Bound::Included(range.x().to_byte_tuple());
809 let end = Bound::Excluded(range.y().to_byte_tuple());
810 let bounds = RecordsBounds::new(start, end);
811 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
812 chain_none(iter)
813 }
814 Ordering::Greater => {
816 let end = Bound::Excluded(range.y().to_byte_tuple());
818 let bounds = RecordsBounds::from_start(&self.namespace, end);
819 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
820
821 let start = Bound::Included(range.x().to_byte_tuple());
823 let bounds = RecordsBounds::to_end(&self.namespace, start);
824 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
825
826 iter.chain(Some(iter2).into_iter().flatten())
827 }
828 };
829 Ok(iter)
830 }
831
832 #[cfg(test)]
833 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
834 self.store.as_mut().modify(|tables| {
835 let entry = {
836 let (namespace, author, key) = id.as_byte_tuple();
837 let id = (namespace, key, author);
838 tables.records_by_key.remove(id)?;
839 let id = (namespace, author, key);
840 let value = tables.records.remove(id)?;
841 value.map(|value| into_entry(id, value.value()))
842 };
843 Ok(entry)
844 })
845 }
846
847 #[cfg(test)]
848 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
849 let tables = self.store.as_mut().tables()?;
850 let bounds = RecordsBounds::namespace(self.namespace);
851 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
852 Ok(chain_none(iter))
853 }
854
855 fn prefixes_of(
856 &mut self,
857 id: &RecordIdentifier,
858 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
859 let tables = self.store.as_mut().tables()?;
860 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
861 }
862
863 #[cfg(test)]
864 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
865 let tables = self.store.as_mut().tables()?;
866 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
867 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
868 Ok(chain_none(iter))
869 }
870
871 fn remove_prefix_filtered(
872 &mut self,
873 id: &RecordIdentifier,
874 predicate: impl Fn(&Record) -> bool,
875 ) -> Result<usize> {
876 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
877 self.store.as_mut().modify(|tables| {
878 let cb = |_k: RecordsId, v: RecordsValue| {
879 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
880 let record = Record::new(hash.into(), len, timestamp);
881
882 predicate(&record)
883 };
884 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
885 let count = iter.count();
886 Ok(count)
887 })
888 }
889}
890
891fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
892 iter: I,
893) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
894 iter.chain(None.into_iter().flatten())
895}
896
897#[derive(Debug)]
900pub struct ParentIterator {
901 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
902}
903
904impl ParentIterator {
905 fn new(
906 tables: &Tables,
907 namespace: NamespaceId,
908 author: AuthorId,
909 key: Vec<u8>,
910 ) -> anyhow::Result<Self> {
911 let parents = parents(&tables.records, namespace, author, key.clone());
912 Ok(Self {
913 inner: parents.into_iter(),
914 })
915 }
916}
917
918fn parents(
919 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
920 namespace: NamespaceId,
921 author: AuthorId,
922 mut key: Vec<u8>,
923) -> Vec<anyhow::Result<SignedEntry>> {
924 let mut res = Vec::new();
925
926 while !key.is_empty() {
927 let entry = get_exact(table, namespace, author, &key, false);
928 key.pop();
929 match entry {
930 Err(err) => res.push(Err(err)),
931 Ok(Some(entry)) => res.push(Ok(entry)),
932 Ok(None) => continue,
933 }
934 }
935 res.reverse();
936 res
937}
938
939impl Iterator for ParentIterator {
940 type Item = Result<SignedEntry>;
941
942 fn next(&mut self) -> Option<Self::Item> {
943 self.inner.next()
944 }
945}
946
947#[derive(derive_more::Debug)]
955pub struct ContentHashesIterator {
956 #[debug(skip)]
957 range: RecordsRange<'static>,
958}
959
960impl ContentHashesIterator {
961 pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
963 let range = RecordsRange::all_static(table)?;
964 Ok(Self { range })
965 }
966}
967
968impl Iterator for ContentHashesIterator {
969 type Item = Result<Hash>;
970
971 fn next(&mut self) -> Option<Self::Item> {
972 let v = self.range.next()?;
973 Some(v.map(|e| e.content_hash()))
974 }
975}
976
977#[derive(derive_more::Debug)]
979#[debug("LatestIterator")]
980pub struct LatestIterator<'a>(
981 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
982);
983
984impl<'a> LatestIterator<'a> {
985 fn new(
986 latest_per_author: &'a impl ReadableTable<
987 LatestPerAuthorKey<'static>,
988 LatestPerAuthorValue<'static>,
989 >,
990 namespace: NamespaceId,
991 ) -> anyhow::Result<Self> {
992 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
993 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
994 let range = latest_per_author.range(start..=end)?;
995 Ok(Self(range))
996 }
997}
998
999impl Iterator for LatestIterator<'_> {
1000 type Item = Result<(AuthorId, u64, Vec<u8>)>;
1001
1002 fn next(&mut self) -> Option<Self::Item> {
1003 self.0.next_map(|key, value| {
1004 let (_namespace, author) = key;
1005 let (timestamp, key) = value;
1006 (author.into(), timestamp, key.to_vec())
1007 })
1008 }
1009}
1010
1011fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
1012 let (namespace, author, key) = key;
1013 let (timestamp, namespace_sig, author_sig, len, hash) = value;
1014 let id = RecordIdentifier::new(namespace, author, key);
1015 let record = Record::new(hash.into(), len, timestamp);
1016 let entry = Entry::new(id, record);
1017 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
1018 SignedEntry::new(entry_signature, entry)
1019}
1020
1021#[cfg(all(test, feature = "fs-store"))]
1022mod tests {
1023 use super::*;
1024 use crate::ranger::Store as _;
1025
1026 #[tokio::test]
1027 async fn test_ranges() -> Result<()> {
1028 let dbfile = tempfile::NamedTempFile::new()?;
1029 let mut store = Store::persistent(dbfile.path())?;
1030
1031 let author = store.new_author(&mut rand::rng())?;
1032 let namespace = NamespaceSecret::new(&mut rand::rng());
1033 let mut replica = store.new_replica(namespace.clone())?;
1034
1035 let key1 = vec![255, 255];
1037 let key2 = vec![255, 255, 255];
1038 replica.hash_and_insert(&key1, &author, b"v1").await?;
1039 replica.hash_and_insert(&key2, &author, b"v2").await?;
1040 let res = store
1041 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
1042 .collect::<Result<Vec<_>>>()?;
1043 assert_eq!(res.len(), 2);
1044 assert_eq!(
1045 res.into_iter()
1046 .map(|entry| entry.key().to_vec())
1047 .collect::<Vec<_>>(),
1048 vec![key1, key2]
1049 );
1050 Ok(())
1051 }
1052
1053 #[test]
1054 fn test_basics() -> Result<()> {
1055 let dbfile = tempfile::NamedTempFile::new()?;
1056 let mut store = Store::persistent(dbfile.path())?;
1057
1058 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
1059 assert!(authors.is_empty());
1060
1061 let author = store.new_author(&mut rand::rng())?;
1062 let namespace = NamespaceSecret::new(&mut rand::rng());
1063 let _replica = store.new_replica(namespace.clone())?;
1064 store.close_replica(namespace.id());
1065 let replica = store.load_replica_info(&namespace.id())?;
1066 assert_eq!(replica.capability.id(), namespace.id());
1067
1068 let author_back = store.get_author(&author.id())?.unwrap();
1069 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1070
1071 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1072 for i in 0..5 {
1073 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1074 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1075 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1076 wrapper.entry_put(entry)?;
1077 }
1078
1079 let all: Vec<_> = wrapper.all()?.collect();
1081 assert_eq!(all.len(), 5);
1082
1083 let mut ids = Vec::new();
1085 for i in 0..5 {
1086 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1087 let entry = Entry::new(
1088 id.clone(),
1089 Record::current_from_data(format!("world-{i}-2")),
1090 );
1091 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1092 wrapper.entry_put(entry)?;
1093 ids.push(id);
1094 }
1095
1096 let entries = wrapper
1098 .store
1099 .get_many(namespace.id(), Query::all())?
1100 .collect::<Result<Vec<_>>>()?;
1101 assert_eq!(entries.len(), 5);
1102
1103 let entries = wrapper
1105 .store
1106 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1107 .collect::<Result<Vec<_>>>()?;
1108 assert_eq!(entries.len(), 5);
1109
1110 for id in ids {
1112 let res = wrapper.get(&id)?;
1113 assert!(res.is_some());
1114 let out = wrapper.entry_remove(&id)?.unwrap();
1115 assert_eq!(out.entry().id(), &id);
1116 let res = wrapper.get(&id)?;
1117 assert!(res.is_none());
1118 }
1119
1120 let entries = wrapper
1122 .store
1123 .get_many(namespace.id(), Query::all())?
1124 .collect::<Result<Vec<_>>>()?;
1125 assert_eq!(entries.len(), 0);
1126
1127 Ok(())
1128 }
1129
1130 fn copy_and_modify(
1131 source: &std::path::Path,
1132 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1133 ) -> Result<tempfile::NamedTempFile> {
1134 let dbfile = tempfile::NamedTempFile::new()?;
1135 std::fs::copy(source, dbfile.path())?;
1136 let db = Database::create(dbfile.path())?;
1137 let write_tx = db.begin_write()?;
1138 modify(&write_tx)?;
1139 write_tx.commit()?;
1140 drop(db);
1141 Ok(dbfile)
1142 }
1143
1144 #[tokio::test]
1145 #[cfg(feature = "fs-store")]
1146 async fn test_migration_001_populate_latest_table() -> Result<()> {
1147 use super::tables::LATEST_PER_AUTHOR_TABLE;
1148 let dbfile = tempfile::NamedTempFile::new()?;
1149 let namespace = NamespaceSecret::new(&mut rand::rng());
1150
1151 let expected = {
1153 let mut store = Store::persistent(dbfile.path())?;
1154 let author1 = store.new_author(&mut rand::rng())?;
1155 let author2 = store.new_author(&mut rand::rng())?;
1156 let mut replica = store.new_replica(namespace.clone())?;
1157 replica.hash_and_insert(b"k1", &author1, b"v1").await?;
1158 replica.hash_and_insert(b"k2", &author2, b"v1").await?;
1159 replica.hash_and_insert(b"k3", &author1, b"v1").await?;
1160
1161 let expected = store
1162 .get_latest_for_each_author(namespace.id())?
1163 .collect::<Result<Vec<_>>>()?;
1164 store.close_replica(namespace.id());
1166 store.flush()?;
1168 drop(store);
1169 expected
1170 };
1171 assert_eq!(expected.len(), 2);
1172
1173 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1175 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1176 Ok(())
1177 })?;
1178
1179 let mut store = Store::persistent(dbfile_before_migration.path())?;
1181 let actual = store
1182 .get_latest_for_each_author(namespace.id())?
1183 .collect::<Result<Vec<_>>>()?;
1184
1185 assert_eq!(expected, actual);
1186 Ok(())
1187 }
1188
1189 #[test]
1190 #[cfg(feature = "fs-store")]
1191 fn test_migration_004_populate_by_key_index() -> Result<()> {
1192 use redb::ReadableTableMetadata;
1193 let dbfile = tempfile::NamedTempFile::new()?;
1194
1195 let mut store = Store::persistent(dbfile.path())?;
1196
1197 {
1199 let tables = store.tables()?;
1200 assert_eq!(tables.records_by_key.len()?, 0);
1201 }
1202
1203 Ok(())
1205 }
1206
1207 #[test]
1208 #[cfg(all(feature = "fs-store", feature = "redb-v2-migration"))]
1209 fn test_migration_redb_v2_tuples() -> Result<()> {
1210 use migrate_redb_v2_tuples::old;
1211
1212 let dbfile = tempfile::NamedTempFile::new()?;
1213 let path = dbfile.path().to_path_buf();
1214
1215 let ns = [1u8; 32];
1216 let author = [2u8; 32];
1217 let key: &[u8] = b"hello";
1218 let ns_sig = [3u8; 64];
1219 let auth_sig = [4u8; 64];
1220 let hash = [5u8; 32];
1221
1222 {
1223 let db = redb_v3::Database::create(&path)?;
1224 let tx = db.begin_write()?;
1225 {
1226 let mut records = tx.open_table(old::RECORDS_TABLE)?;
1227 let mut latest = tx.open_table(old::LATEST_PER_AUTHOR_TABLE)?;
1228 let mut by_key = tx.open_table(old::RECORDS_BY_KEY_TABLE)?;
1229 records.insert(
1230 (&ns, &author, key),
1231 (42u64, &ns_sig, &auth_sig, 7u64, &hash),
1232 )?;
1233 latest.insert((&ns, &author), (42u64, key))?;
1234 by_key.insert((&ns, key, &author), ())?;
1235 }
1236 tx.commit()?;
1237 }
1238
1239 {
1241 let db = redb::Database::create(&path)?;
1242 let tx = db.begin_write()?;
1243 let err = Tables::new(&tx).unwrap_err();
1244 assert!(
1245 matches!(err, redb::TableError::TableTypeMismatch { .. }),
1246 "expected TableTypeMismatch, got {err:?}",
1247 );
1248 }
1249
1250 let store = Store::persistent(&path)?;
1251 drop(store);
1252
1253 let backup: std::path::PathBuf = {
1254 let mut p = path.clone().into_os_string();
1255 p.push(".backup-redb-v2-tuples");
1256 p.into()
1257 };
1258 assert!(
1259 backup.exists(),
1260 "missing backup file at {}",
1261 backup.display()
1262 );
1263
1264 {
1266 let db = redb::Database::create(&path)?;
1267 let tx = db.begin_read()?;
1268 let records = tx.open_table(tables::RECORDS_TABLE)?;
1269 let entries: Vec<_> = records.iter()?.collect::<std::result::Result<_, _>>()?;
1270 assert_eq!(entries.len(), 1);
1271 let (k, v) = &entries[0];
1272 assert_eq!(k.value(), (&ns, &author, key));
1273 assert_eq!(v.value(), (42u64, &ns_sig, &auth_sig, 7u64, &hash));
1274
1275 let latest = tx.open_table(tables::LATEST_PER_AUTHOR_TABLE)?;
1276 let entries: Vec<_> = latest.iter()?.collect::<std::result::Result<_, _>>()?;
1277 assert_eq!(entries.len(), 1);
1278 assert_eq!(entries[0].0.value(), (&ns, &author));
1279 assert_eq!(entries[0].1.value(), (42u64, key));
1280
1281 let by_key = tx.open_table(tables::RECORDS_BY_KEY_TABLE)?;
1282 let entries: Vec<_> = by_key.iter()?.collect::<std::result::Result<_, _>>()?;
1283 assert_eq!(entries.len(), 1);
1284 assert_eq!(entries[0].0.value(), (&ns, key, &author));
1285 }
1286
1287 Ok(())
1288 }
1289
1290 #[test]
1291 #[cfg(feature = "fs-store")]
1292 fn test_no_migration_on_fresh_store() -> Result<()> {
1293 let dbfile = tempfile::NamedTempFile::new()?;
1294 let path = dbfile.path().to_path_buf();
1295 let store = Store::persistent(&path)?;
1296 drop(store);
1297
1298 let backup: std::path::PathBuf = {
1299 let mut p = path.clone().into_os_string();
1300 p.push(".backup-redb-v2-tuples");
1301 p.into()
1302 };
1303 assert!(
1304 !backup.exists(),
1305 "unexpected backup file at {}",
1306 backup.display()
1307 );
1308
1309 let _store = Store::persistent(&path)?;
1310 assert!(!backup.exists());
1311 Ok(())
1312 }
1313}