1use std::{
4 cmp::Ordering,
5 collections::HashSet,
6 iter::{Chain, Flatten},
7 num::NonZeroU64,
8 ops::Bound,
9 path::Path,
10 time::Duration,
11};
12
13use anyhow::{anyhow, Result};
14use ed25519_dalek::{SignatureError, VerifyingKey};
15use iroh_base::hash::Hash;
16use rand_core::CryptoRngCore;
17use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
18
19use crate::{
20 keys::Author,
21 ranger::{Fingerprint, Range, RangeEntry},
22 sync::{Entry, EntrySignature, Record, RecordIdentifier, Replica, SignedEntry},
23 AuthorHeads, AuthorId, Capability, CapabilityKind, NamespaceId, NamespaceSecret, PeerIdBytes,
24 ReplicaInfo,
25};
26
27use super::{
28 pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore,
29 Query,
30};
31
32mod bounds;
33mod migrate_v1_v2;
34mod migrations;
35mod query;
36mod ranges;
37pub(crate) mod tables;
38
39use self::{
40 bounds::{ByKeyBounds, RecordsBounds},
41 ranges::RangeExt,
42 tables::{RecordsTable, TransactionAndTables},
43};
44use self::{
45 query::QueryIterator,
46 tables::{
47 LatestPerAuthorKey, LatestPerAuthorValue, ReadOnlyTables, RecordsId, RecordsValue, Tables,
48 },
49};
50
51pub use self::ranges::RecordsRange;
52
53#[derive(Debug)]
55pub struct Store {
56 db: Database,
57 transaction: CurrentTransaction,
58 open_replicas: HashSet<NamespaceId>,
59 pubkeys: MemPublicKeyStore,
60}
61
62impl AsRef<Store> for Store {
63 fn as_ref(&self) -> &Store {
64 self
65 }
66}
67
68impl AsMut<Store> for Store {
69 fn as_mut(&mut self) -> &mut Store {
70 self
71 }
72}
73
74#[derive(derive_more::Debug, Default)]
75enum CurrentTransaction {
76 #[default]
77 None,
78 Read(ReadOnlyTables),
79 Write(TransactionAndTables),
80}
81
82impl Store {
83 pub fn memory() -> Self {
85 Self::memory_impl().expect("failed to create memory store")
86 }
87
88 fn memory_impl() -> Result<Self> {
89 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
90 Self::new_impl(db)
91 }
92
93 pub fn persistent(path: impl AsRef<Path>) -> Result<Self> {
97 let db = match Database::create(&path) {
98 Ok(db) => db,
99 Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?,
100 Err(err) => return Err(err.into()),
101 };
102 Self::new_impl(db)
103 }
104
105 fn new_impl(db: redb::Database) -> Result<Self> {
106 let write_tx = db.begin_write()?;
108 let _ = Tables::new(&write_tx)?;
109 write_tx.commit()?;
110
111 migrations::run_migrations(&db)?;
113
114 Ok(Store {
115 db,
116 transaction: Default::default(),
117 open_replicas: Default::default(),
118 pubkeys: Default::default(),
119 })
120 }
121
122 pub fn flush(&mut self) -> Result<()> {
126 if let CurrentTransaction::Write(w) = std::mem::take(&mut self.transaction) {
127 w.commit()?;
128 }
129 Ok(())
130 }
131
132 pub fn snapshot(&mut self) -> Result<&ReadOnlyTables> {
137 let guard = &mut self.transaction;
138 let tables = match std::mem::take(guard) {
139 CurrentTransaction::None => {
140 let tx = self.db.begin_read()?;
141 ReadOnlyTables::new(tx)?
142 }
143 CurrentTransaction::Write(w) => {
144 w.commit()?;
145 let tx = self.db.begin_read()?;
146 ReadOnlyTables::new(tx)?
147 }
148 CurrentTransaction::Read(tables) => tables,
149 };
150 *guard = CurrentTransaction::Read(tables);
151 match &*guard {
152 CurrentTransaction::Read(ref tables) => Ok(tables),
153 _ => unreachable!(),
154 }
155 }
156
157 fn tables(&mut self) -> Result<&Tables> {
168 let guard = &mut self.transaction;
169 let tables = match std::mem::take(guard) {
170 CurrentTransaction::None => {
171 let tx = self.db.begin_write()?;
172 TransactionAndTables::new(tx)?
173 }
174 CurrentTransaction::Write(w) => {
175 if w.since.elapsed() > Duration::from_millis(500) {
176 tracing::debug!("committing transaction because it's too old");
177 w.commit()?;
178 let tx = self.db.begin_write()?;
179 TransactionAndTables::new(tx)?
180 } else {
181 w
182 }
183 }
184 CurrentTransaction::Read(_) => {
185 let tx = self.db.begin_write()?;
186 TransactionAndTables::new(tx)?
187 }
188 };
189 *guard = CurrentTransaction::Write(tables);
190 match guard {
191 CurrentTransaction::Write(ref mut tables) => Ok(tables.tables()),
192 _ => unreachable!(),
193 }
194 }
195
196 fn modify<T>(&mut self, f: impl FnOnce(&mut Tables) -> Result<T>) -> Result<T> {
205 let guard = &mut self.transaction;
206 let tables = match std::mem::take(guard) {
207 CurrentTransaction::None => {
208 let tx = self.db.begin_write()?;
209 TransactionAndTables::new(tx)?
210 }
211 CurrentTransaction::Write(w) => w,
212 CurrentTransaction::Read(_) => {
213 let tx = self.db.begin_write()?;
214 TransactionAndTables::new(tx)?
215 }
216 };
217 *guard = CurrentTransaction::Write(tables);
218 let res = match &mut *guard {
219 CurrentTransaction::Write(ref mut tables) => tables.with_tables_mut(f)?,
220 _ => unreachable!(),
221 };
222 Ok(res)
223 }
224}
225
226type AuthorsIter = std::vec::IntoIter<Result<Author>>;
227type NamespaceIter = std::vec::IntoIter<Result<(NamespaceId, CapabilityKind)>>;
228type PeersIter = std::vec::IntoIter<PeerIdBytes>;
229
230impl Store {
231 pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
233 let id = namespace.id();
234 self.import_namespace(namespace.into())?;
235 self.open_replica(&id).map_err(Into::into)
236 }
237
238 pub fn new_author<R: CryptoRngCore + ?Sized>(&mut self, rng: &mut R) -> Result<Author> {
240 let author = Author::new(rng);
241 self.import_author(author.clone())?;
242 Ok(author)
243 }
244
245 pub fn has_news_for_us(
249 &mut self,
250 namespace: NamespaceId,
251 heads: &AuthorHeads,
252 ) -> Result<Option<NonZeroU64>> {
253 let our_heads = {
254 let latest = self.get_latest_for_each_author(namespace)?;
255 let mut heads = AuthorHeads::default();
256 for e in latest {
257 let (author, timestamp, _key) = e?;
258 heads.insert(author, timestamp);
259 }
260 heads
261 };
262 let has_news_for_us = heads.has_news_for(&our_heads);
263 Ok(has_news_for_us)
264 }
265
266 pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
270 let info = self.load_replica_info(namespace_id)?;
271 let instance = StoreInstance::new(*namespace_id, self);
272 Ok(Replica::new(instance, Box::new(info)))
273 }
274
275 pub fn load_replica_info(
277 &mut self,
278 namespace_id: &NamespaceId,
279 ) -> Result<ReplicaInfo, OpenError> {
280 let tables = self.tables()?;
281 let info = match tables.namespaces.get(namespace_id.as_bytes()) {
282 Ok(Some(db_value)) => {
283 let (raw_kind, raw_bytes) = db_value.value();
284 let namespace = Capability::from_raw(raw_kind, raw_bytes)?;
285 ReplicaInfo::new(namespace)
286 }
287 Ok(None) => return Err(OpenError::NotFound),
288 Err(err) => return Err(OpenError::Other(err.into())),
289 };
290 self.open_replicas.insert(info.capability.id());
291 Ok(info)
292 }
293
294 pub fn close_replica(&mut self, id: NamespaceId) {
296 self.open_replicas.remove(&id);
297 }
298
299 pub fn list_namespaces(&mut self) -> Result<NamespaceIter> {
301 let tables = self.tables()?;
303 let namespaces: Vec<_> = tables
304 .namespaces
305 .iter()?
306 .map(|res| {
307 let capability = parse_capability(res?.1.value())?;
308 Ok((capability.id(), capability.kind()))
309 })
310 .collect();
311 Ok(namespaces.into_iter())
312 }
313
314 pub fn get_author(&mut self, author_id: &AuthorId) -> Result<Option<Author>> {
316 let tables = self.tables()?;
317 let Some(author) = tables.authors.get(author_id.as_bytes())? else {
318 return Ok(None);
319 };
320 let author = Author::from_bytes(author.value());
321 Ok(Some(author))
322 }
323
324 pub fn import_author(&mut self, author: Author) -> Result<()> {
326 self.modify(|tables| {
327 tables
328 .authors
329 .insert(author.id().as_bytes(), &author.to_bytes())?;
330 Ok(())
331 })
332 }
333
334 pub fn delete_author(&mut self, author: AuthorId) -> Result<()> {
336 self.modify(|tables| {
337 tables.authors.remove(author.as_bytes())?;
338 Ok(())
339 })
340 }
341
342 pub fn list_authors(&mut self) -> Result<AuthorsIter> {
344 let tables = self.tables()?;
346 let authors: Vec<_> = tables
347 .authors
348 .iter()?
349 .map(|res| match res {
350 Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
351 Err(err) => Err(err.into()),
352 })
353 .collect();
354
355 Ok(authors.into_iter())
356 }
357
358 pub fn import_namespace(&mut self, capability: Capability) -> Result<ImportNamespaceOutcome> {
360 self.modify(|tables| {
361 let outcome = {
362 let (capability, outcome) = {
363 let existing = tables.namespaces.get(capability.id().as_bytes())?;
364 if let Some(existing) = existing {
365 let mut existing = parse_capability(existing.value())?;
366 let outcome = if existing.merge(capability)? {
367 ImportNamespaceOutcome::Upgraded
368 } else {
369 ImportNamespaceOutcome::NoChange
370 };
371 (existing, outcome)
372 } else {
373 (capability, ImportNamespaceOutcome::Inserted)
374 }
375 };
376 let id = capability.id().to_bytes();
377 let (kind, bytes) = capability.raw();
378 tables.namespaces.insert(&id, (kind, &bytes))?;
379 outcome
380 };
381 Ok(outcome)
382 })
383 }
384
385 pub fn remove_replica(&mut self, namespace: &NamespaceId) -> Result<()> {
393 if self.open_replicas.contains(namespace) {
394 return Err(anyhow!("replica is not closed"));
395 }
396 self.modify(|tables| {
397 let bounds = RecordsBounds::namespace(*namespace);
398 tables.records.retain_in(bounds.as_ref(), |_k, _v| false)?;
399 let bounds = ByKeyBounds::namespace(*namespace);
400 let _ = tables
401 .records_by_key
402 .retain_in(bounds.as_ref(), |_k, _v| false);
403 tables.namespaces.remove(namespace.as_bytes())?;
404 tables.namespace_peers.remove_all(namespace.as_bytes())?;
405 tables.download_policy.remove(namespace.as_bytes())?;
406 Ok(())
407 })
408 }
409
410 pub fn get_many(
412 &mut self,
413 namespace: NamespaceId,
414 query: impl Into<Query>,
415 ) -> Result<QueryIterator> {
416 QueryIterator::new(self.tables()?, namespace, query.into())
417 }
418
419 pub fn get_exact(
421 &mut self,
422 namespace: NamespaceId,
423 author: AuthorId,
424 key: impl AsRef<[u8]>,
425 include_empty: bool,
426 ) -> Result<Option<SignedEntry>> {
427 get_exact(
428 &self.tables()?.records,
429 namespace,
430 author,
431 key,
432 include_empty,
433 )
434 }
435
436 pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
438 self.flush()?;
440 assert!(matches!(self.transaction, CurrentTransaction::None));
441 let tx = self.db.begin_read()?;
442 let tables = ReadOnlyTables::new(tx)?;
443 let records = tables.records;
444 ContentHashesIterator::all(records)
445 }
446
447 pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
449 LatestIterator::new(&self.tables()?.latest_per_author, namespace)
450 }
451
452 pub fn register_useful_peer(
454 &mut self,
455 namespace: NamespaceId,
456 peer: crate::PeerIdBytes,
457 ) -> Result<()> {
458 let peer = &peer;
459 let namespace = namespace.as_bytes();
460 let nanos = std::time::UNIX_EPOCH
462 .elapsed()
463 .map(|duration| duration.as_nanos() as u64)?;
464 self.modify(|tables| {
465 anyhow::ensure!(
467 tables.namespaces.get(namespace)?.is_some(),
468 "document not created"
469 );
470
471 let mut namespace_peers = tables.namespace_peers.get(namespace)?;
472
473 let maybe_oldest = namespace_peers.next().transpose()?.map(|guard| {
475 let (oldest_nanos, &oldest_peer) = guard.value();
476 (oldest_nanos, oldest_peer)
477 });
478 match maybe_oldest {
479 None => {
480 drop(namespace_peers);
483 tables.namespace_peers.insert(namespace, (nanos, peer))?;
484 }
485 Some((oldest_nanos, oldest_peer)) => {
486 let oldest_peer = &oldest_peer;
487
488 if oldest_peer == peer {
489 drop(namespace_peers);
492 tables
493 .namespace_peers
494 .remove(namespace, (oldest_nanos, oldest_peer))?;
495 tables.namespace_peers.insert(namespace, (nanos, peer))?;
496 } else {
497 let mut len = 1;
499 let mut prev_peer_nanos = None;
501
502 for result in namespace_peers {
503 len += 1;
504 let guard = result?;
505 let (peer_nanos, peer_bytes) = guard.value();
506 if prev_peer_nanos.is_none() && peer_bytes == peer {
507 prev_peer_nanos = Some(peer_nanos)
508 }
509 }
510
511 match prev_peer_nanos {
512 Some(prev_nanos) => {
513 tables
516 .namespace_peers
517 .remove(namespace, (prev_nanos, peer))?;
518 tables.namespace_peers.insert(namespace, (nanos, peer))?;
519 }
520 None => {
521 tables.namespace_peers.insert(namespace, (nanos, peer))?;
524 len += 1;
525 if len > super::PEERS_PER_DOC_CACHE_SIZE.get() {
526 tables
527 .namespace_peers
528 .remove(namespace, (oldest_nanos, oldest_peer))?;
529 }
530 }
531 }
532 }
533 }
534 }
535 Ok(())
536 })
537 }
538
539 pub fn get_sync_peers(&mut self, namespace: &NamespaceId) -> Result<Option<PeersIter>> {
541 let tables = self.tables()?;
542 let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get());
543 for result in tables.namespace_peers.get(namespace.as_bytes())?.rev() {
544 let (_nanos, &peer) = result?.value();
545 peers.push(peer);
546 }
547 if peers.is_empty() {
548 Ok(None)
549 } else {
550 Ok(Some(peers.into_iter()))
551 }
552 }
553
554 pub fn set_download_policy(
556 &mut self,
557 namespace: &NamespaceId,
558 policy: DownloadPolicy,
559 ) -> Result<()> {
560 self.modify(|tables| {
561 let namespace = namespace.as_bytes();
562
563 anyhow::ensure!(
565 tables.namespaces.get(&namespace)?.is_some(),
566 "document not created"
567 );
568
569 let value = postcard::to_stdvec(&policy)?;
570 tables.download_policy.insert(namespace, value.as_slice())?;
571 Ok(())
572 })
573 }
574
575 pub fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
577 let tables = self.tables()?;
578 let value = tables.download_policy.get(namespace.as_bytes())?;
579 Ok(match value {
580 None => DownloadPolicy::default(),
581 Some(value) => postcard::from_bytes(value.value())?,
582 })
583 }
584}
585
586impl PublicKeyStore for Store {
587 fn public_key(&self, id: &[u8; 32]) -> Result<VerifyingKey, SignatureError> {
588 self.pubkeys.public_key(id)
589 }
590}
591
592fn parse_capability((raw_kind, raw_bytes): (u8, &[u8; 32])) -> Result<Capability> {
593 Capability::from_raw(raw_kind, raw_bytes)
594}
595
596fn get_exact(
597 record_table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
598 namespace: NamespaceId,
599 author: AuthorId,
600 key: impl AsRef<[u8]>,
601 include_empty: bool,
602) -> Result<Option<SignedEntry>> {
603 let id = (namespace.as_bytes(), author.as_bytes(), key.as_ref());
604 let record = record_table.get(id)?;
605 Ok(record
606 .map(|r| into_entry(id, r.value()))
607 .filter(|entry| include_empty || !entry.is_empty()))
608}
609
610#[derive(Debug)]
612pub struct StoreInstance<'a> {
613 namespace: NamespaceId,
614 pub(crate) store: &'a mut Store,
615}
616
617impl<'a> StoreInstance<'a> {
618 pub(crate) fn new(namespace: NamespaceId, store: &'a mut Store) -> Self {
619 StoreInstance { namespace, store }
620 }
621}
622
623impl<'a> PublicKeyStore for StoreInstance<'a> {
624 fn public_key(&self, id: &[u8; 32]) -> std::result::Result<VerifyingKey, SignatureError> {
625 self.store.public_key(id)
626 }
627}
628
629impl<'a> super::DownloadPolicyStore for StoreInstance<'a> {
630 fn get_download_policy(&mut self, namespace: &NamespaceId) -> Result<DownloadPolicy> {
631 self.store.get_download_policy(namespace)
632 }
633}
634
635impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
636 type Error = anyhow::Error;
637 type RangeIterator<'x> = Chain<RecordsRange<'x>, Flatten<std::option::IntoIter<RecordsRange<'x>>>>
638 where 'a: 'x;
639 type ParentIterator<'x> = ParentIterator
640 where 'a: 'x;
641
642 fn get_first(&mut self) -> Result<RecordIdentifier> {
644 let tables = self.store.as_mut().tables()?;
645 let bounds = RecordsBounds::namespace(self.namespace);
647 let mut records = tables.records.range(bounds.as_ref())?;
648
649 let Some(record) = records.next() else {
650 return Ok(RecordIdentifier::default());
651 };
652 let (compound_key, _value) = record?;
653 let (namespace_id, author_id, key) = compound_key.value();
654 let id = RecordIdentifier::new(namespace_id, author_id, key);
655 Ok(id)
656 }
657
658 fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
659 self.store
660 .as_mut()
661 .get_exact(id.namespace(), id.author(), id.key(), true)
662 }
663
664 fn len(&mut self) -> Result<usize> {
665 let tables = self.store.as_mut().tables()?;
666 let bounds = RecordsBounds::namespace(self.namespace);
667 let records = tables.records.range(bounds.as_ref())?;
668 Ok(records.count())
669 }
670
671 fn is_empty(&mut self) -> Result<bool> {
672 let tables = self.store.as_mut().tables()?;
673 Ok(tables.records.is_empty()?)
674 }
675
676 fn get_fingerprint(&mut self, range: &Range<RecordIdentifier>) -> Result<Fingerprint> {
677 let elements = self.get_range(range.clone())?;
679
680 let mut fp = Fingerprint::empty();
681 for el in elements {
682 let el = el?;
683 fp ^= el.as_fingerprint();
684 }
685
686 Ok(fp)
687 }
688
689 fn entry_put(&mut self, e: SignedEntry) -> Result<()> {
690 let id = e.id();
691 self.store.as_mut().modify(|tables| {
692 let key = (
694 &id.namespace().to_bytes(),
695 &id.author().to_bytes(),
696 id.key(),
697 );
698 let hash = e.content_hash(); let value = (
700 e.timestamp(),
701 &e.signature().namespace().to_bytes(),
702 &e.signature().author().to_bytes(),
703 e.content_len(),
704 hash.as_bytes(),
705 );
706 tables.records.insert(key, value)?;
707
708 let key = (
710 &id.namespace().to_bytes(),
711 id.key(),
712 &id.author().to_bytes(),
713 );
714 tables.records_by_key.insert(key, ())?;
715
716 let key = (&e.id().namespace().to_bytes(), &e.id().author().to_bytes());
718 let value = (e.timestamp(), e.id().key());
719 tables.latest_per_author.insert(key, value)?;
720 Ok(())
721 })
722 }
723
724 fn get_range(&mut self, range: Range<RecordIdentifier>) -> Result<Self::RangeIterator<'_>> {
725 let tables = self.store.as_mut().tables()?;
726 let iter = match range.x().cmp(range.y()) {
727 Ordering::Equal => {
729 let bounds = RecordsBounds::namespace(self.namespace);
731 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
732 chain_none(iter)
733 }
734 Ordering::Less => {
736 let start = Bound::Included(range.x().to_byte_tuple());
738 let end = Bound::Excluded(range.y().to_byte_tuple());
739 let bounds = RecordsBounds::new(start, end);
740 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
741 chain_none(iter)
742 }
743 Ordering::Greater => {
745 let end = Bound::Excluded(range.y().to_byte_tuple());
747 let bounds = RecordsBounds::from_start(&self.namespace, end);
748 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
749
750 let start = Bound::Included(range.x().to_byte_tuple());
752 let bounds = RecordsBounds::to_end(&self.namespace, start);
753 let iter2 = RecordsRange::with_bounds(&tables.records, bounds)?;
754
755 iter.chain(Some(iter2).into_iter().flatten())
756 }
757 };
758 Ok(iter)
759 }
760
761 fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
762 self.store.as_mut().modify(|tables| {
763 let entry = {
764 let (namespace, author, key) = id.as_byte_tuple();
765 let id = (namespace, key, author);
766 tables.records_by_key.remove(id)?;
767 let id = (namespace, author, key);
768 let value = tables.records.remove(id)?;
769 value.map(|value| into_entry(id, value.value()))
770 };
771 Ok(entry)
772 })
773 }
774
775 fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
776 let tables = self.store.as_mut().tables()?;
777 let bounds = RecordsBounds::namespace(self.namespace);
778 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
779 Ok(chain_none(iter))
780 }
781
782 fn prefixes_of(
783 &mut self,
784 id: &RecordIdentifier,
785 ) -> Result<Self::ParentIterator<'_>, Self::Error> {
786 let tables = self.store.as_mut().tables()?;
787 ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
788 }
789
790 fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
791 let tables = self.store.as_mut().tables()?;
792 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
793 let iter = RecordsRange::with_bounds(&tables.records, bounds)?;
794 Ok(chain_none(iter))
795 }
796
797 fn remove_prefix_filtered(
798 &mut self,
799 id: &RecordIdentifier,
800 predicate: impl Fn(&Record) -> bool,
801 ) -> Result<usize> {
802 let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
803 self.store.as_mut().modify(|tables| {
804 let cb = |_k: RecordsId, v: RecordsValue| {
805 let (timestamp, _namespace_sig, _author_sig, len, hash) = v;
806 let record = Record::new(hash.into(), len, timestamp);
807
808 predicate(&record)
809 };
810 let iter = tables.records.extract_from_if(bounds.as_ref(), cb)?;
811 let count = iter.count();
812 Ok(count)
813 })
814 }
815}
816
817fn chain_none<'a, I: Iterator<Item = T> + 'a, T>(
818 iter: I,
819) -> Chain<I, Flatten<std::option::IntoIter<I>>> {
820 iter.chain(None.into_iter().flatten())
821}
822
823#[derive(Debug)]
826pub struct ParentIterator {
827 inner: std::vec::IntoIter<anyhow::Result<SignedEntry>>,
828}
829
830impl ParentIterator {
831 fn new(
832 tables: &Tables,
833 namespace: NamespaceId,
834 author: AuthorId,
835 key: Vec<u8>,
836 ) -> anyhow::Result<Self> {
837 let parents = parents(&tables.records, namespace, author, key.clone());
838 Ok(Self {
839 inner: parents.into_iter(),
840 })
841 }
842}
843
844fn parents(
845 table: &impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
846 namespace: NamespaceId,
847 author: AuthorId,
848 mut key: Vec<u8>,
849) -> Vec<anyhow::Result<SignedEntry>> {
850 let mut res = Vec::new();
851
852 while !key.is_empty() {
853 let entry = get_exact(table, namespace, author, &key, false);
854 key.pop();
855 match entry {
856 Err(err) => res.push(Err(err)),
857 Ok(Some(entry)) => res.push(Ok(entry)),
858 Ok(None) => continue,
859 }
860 }
861 res.reverse();
862 res
863}
864
865impl Iterator for ParentIterator {
866 type Item = Result<SignedEntry>;
867
868 fn next(&mut self) -> Option<Self::Item> {
869 self.inner.next()
870 }
871}
872
873self_cell::self_cell!(
874 struct ContentHashesIteratorInner {
875 owner: RecordsTable,
876 #[covariant]
877 dependent: RecordsRange,
878 }
879);
880
881#[derive(derive_more::Debug)]
889pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner);
890
891impl ContentHashesIterator {
892 pub fn all(owner: RecordsTable) -> anyhow::Result<Self> {
894 let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?;
895 Ok(Self(inner))
896 }
897}
898
899impl Iterator for ContentHashesIterator {
900 type Item = Result<Hash>;
901
902 fn next(&mut self) -> Option<Self::Item> {
903 let v = self.0.with_dependent_mut(|_, d| d.next())?;
904 Some(v.map(|e| e.content_hash()))
905 }
906}
907
908#[derive(derive_more::Debug)]
910#[debug("LatestIterator")]
911pub struct LatestIterator<'a>(
912 redb::Range<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>,
913);
914
915impl<'a> LatestIterator<'a> {
916 fn new(
917 latest_per_author: &'a impl ReadableTable<
918 LatestPerAuthorKey<'static>,
919 LatestPerAuthorValue<'static>,
920 >,
921 namespace: NamespaceId,
922 ) -> anyhow::Result<Self> {
923 let start = (namespace.as_bytes(), &[u8::MIN; 32]);
924 let end = (namespace.as_bytes(), &[u8::MAX; 32]);
925 let range = latest_per_author.range(start..=end)?;
926 Ok(Self(range))
927 }
928}
929
930impl<'a> Iterator for LatestIterator<'a> {
931 type Item = Result<(AuthorId, u64, Vec<u8>)>;
932
933 fn next(&mut self) -> Option<Self::Item> {
934 self.0.next_map(|key, value| {
935 let (_namespace, author) = key;
936 let (timestamp, key) = value;
937 (author.into(), timestamp, key.to_vec())
938 })
939 }
940}
941
942fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry {
943 let (namespace, author, key) = key;
944 let (timestamp, namespace_sig, author_sig, len, hash) = value;
945 let id = RecordIdentifier::new(namespace, author, key);
946 let record = Record::new(hash.into(), len, timestamp);
947 let entry = Entry::new(id, record);
948 let entry_signature = EntrySignature::from_parts(namespace_sig, author_sig);
949 SignedEntry::new(entry_signature, entry)
950}
951
952#[cfg(test)]
953mod tests {
954 use super::tables::LATEST_PER_AUTHOR_TABLE;
955
956 use crate::ranger::Store as _;
957
958 use super::*;
959
960 #[test]
961 fn test_ranges() -> Result<()> {
962 let dbfile = tempfile::NamedTempFile::new()?;
963 let mut store = Store::persistent(dbfile.path())?;
964
965 let author = store.new_author(&mut rand::thread_rng())?;
966 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
967 let mut replica = store.new_replica(namespace.clone())?;
968
969 let key1 = vec![255, 255];
971 let key2 = vec![255, 255, 255];
972 replica.hash_and_insert(&key1, &author, b"v1")?;
973 replica.hash_and_insert(&key2, &author, b"v2")?;
974 let res = store
975 .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))?
976 .collect::<Result<Vec<_>>>()?;
977 assert_eq!(res.len(), 2);
978 assert_eq!(
979 res.into_iter()
980 .map(|entry| entry.key().to_vec())
981 .collect::<Vec<_>>(),
982 vec![key1, key2]
983 );
984 Ok(())
985 }
986
987 #[test]
988 fn test_basics() -> Result<()> {
989 let dbfile = tempfile::NamedTempFile::new()?;
990 let mut store = Store::persistent(dbfile.path())?;
991
992 let authors: Vec<_> = store.list_authors()?.collect::<Result<_>>()?;
993 assert!(authors.is_empty());
994
995 let author = store.new_author(&mut rand::thread_rng())?;
996 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
997 let _replica = store.new_replica(namespace.clone())?;
998 store.close_replica(namespace.id());
999 let replica = store.load_replica_info(&namespace.id())?;
1000 assert_eq!(replica.capability.id(), namespace.id());
1001
1002 let author_back = store.get_author(&author.id())?.unwrap();
1003 assert_eq!(author.to_bytes(), author_back.to_bytes(),);
1004
1005 let mut wrapper = StoreInstance::new(namespace.id(), &mut store);
1006 for i in 0..5 {
1007 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1008 let entry = Entry::new(id, Record::current_from_data(format!("world-{i}")));
1009 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1010 wrapper.entry_put(entry)?;
1011 }
1012
1013 let all: Vec<_> = wrapper.all()?.collect();
1015 assert_eq!(all.len(), 5);
1016
1017 let mut ids = Vec::new();
1019 for i in 0..5 {
1020 let id = RecordIdentifier::new(namespace.id(), author.id(), format!("hello-{i}"));
1021 let entry = Entry::new(
1022 id.clone(),
1023 Record::current_from_data(format!("world-{i}-2")),
1024 );
1025 let entry = SignedEntry::from_entry(entry, &namespace, &author);
1026 wrapper.entry_put(entry)?;
1027 ids.push(id);
1028 }
1029
1030 let entries = wrapper
1032 .store
1033 .get_many(namespace.id(), Query::all())?
1034 .collect::<Result<Vec<_>>>()?;
1035 assert_eq!(entries.len(), 5);
1036
1037 let entries = wrapper
1039 .store
1040 .get_many(namespace.id(), Query::key_prefix("hello-"))?
1041 .collect::<Result<Vec<_>>>()?;
1042 assert_eq!(entries.len(), 5);
1043
1044 for id in ids {
1046 let res = wrapper.get(&id)?;
1047 assert!(res.is_some());
1048 let out = wrapper.entry_remove(&id)?.unwrap();
1049 assert_eq!(out.entry().id(), &id);
1050 let res = wrapper.get(&id)?;
1051 assert!(res.is_none());
1052 }
1053
1054 let entries = wrapper
1056 .store
1057 .get_many(namespace.id(), Query::all())?
1058 .collect::<Result<Vec<_>>>()?;
1059 assert_eq!(entries.len(), 0);
1060
1061 Ok(())
1062 }
1063
1064 fn copy_and_modify(
1065 source: &Path,
1066 modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
1067 ) -> Result<tempfile::NamedTempFile> {
1068 let dbfile = tempfile::NamedTempFile::new()?;
1069 std::fs::copy(source, dbfile.path())?;
1070 let db = Database::create(dbfile.path())?;
1071 let write_tx = db.begin_write()?;
1072 modify(&write_tx)?;
1073 write_tx.commit()?;
1074 drop(db);
1075 Ok(dbfile)
1076 }
1077
1078 #[test]
1079 fn test_migration_001_populate_latest_table() -> Result<()> {
1080 let dbfile = tempfile::NamedTempFile::new()?;
1081 let namespace = NamespaceSecret::new(&mut rand::thread_rng());
1082
1083 let expected = {
1085 let mut store = Store::persistent(dbfile.path())?;
1086 let author1 = store.new_author(&mut rand::thread_rng())?;
1087 let author2 = store.new_author(&mut rand::thread_rng())?;
1088 let mut replica = store.new_replica(namespace.clone())?;
1089 replica.hash_and_insert(b"k1", &author1, b"v1")?;
1090 replica.hash_and_insert(b"k2", &author2, b"v1")?;
1091 replica.hash_and_insert(b"k3", &author1, b"v1")?;
1092
1093 let expected = store
1094 .get_latest_for_each_author(namespace.id())?
1095 .collect::<Result<Vec<_>>>()?;
1096 store.close_replica(namespace.id());
1098 store.flush()?;
1100 drop(store);
1101 expected
1102 };
1103 assert_eq!(expected.len(), 2);
1104
1105 let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
1107 tx.delete_table(LATEST_PER_AUTHOR_TABLE)?;
1108 Ok(())
1109 })?;
1110
1111 let mut store = Store::persistent(dbfile_before_migration.path())?;
1113 let actual = store
1114 .get_latest_for_each_author(namespace.id())?
1115 .collect::<Result<Vec<_>>>()?;
1116
1117 assert_eq!(expected, actual);
1118
1119 Ok(())
1120 }
1121
1122 #[test]
1123 fn test_migration_004_populate_by_key_index() -> Result<()> {
1124 let dbfile = tempfile::NamedTempFile::new()?;
1125
1126 let mut store = Store::persistent(dbfile.path())?;
1127
1128 {
1130 let tables = store.tables()?;
1131 assert_eq!(tables.records_by_key.len()?, 0);
1132 }
1133
1134 Ok(())
1137 }
1138}