iroh_sync/
sync.rs

1//! API for iroh-sync replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    sync::Arc,
13    time::{Duration, SystemTime},
14};
15
16use bytes::{Bytes, BytesMut};
17#[cfg(feature = "metrics")]
18use iroh_metrics::{inc, inc_by};
19use std::ops::{Deref, DerefMut};
20
21use ed25519_dalek::{Signature, SignatureError};
22use iroh_base::{base32, hash::Hash};
23use serde::{Deserialize, Serialize};
24
25pub use crate::heads::AuthorHeads;
26#[cfg(feature = "metrics")]
27use crate::metrics::Metrics;
28use crate::{
29    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
30    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
31    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
32};
33
34/// Protocol message for the set reconciliation protocol.
35///
36/// Can be serialized to bytes with [serde] to transfer between peers.
37pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
38
39/// Byte representation of a `PeerId` from `iroh-net`.
40// TODO: PeerId is in iroh-net which iroh-sync doesn't depend on. Add iroh-base crate with `PeerId`.
41pub type PeerIdBytes = [u8; 32];
42
43/// Max time in the future from our wall clock time that we accept entries for.
44/// Value is 10 minutes.
45pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
46
47/// Callback that may be set on a replica to determine the availability status for a content hash.
48pub type ContentStatusCallback = Arc<dyn Fn(Hash) -> ContentStatus + Send + Sync + 'static>;
49
50/// Event emitted by sync when entries are added.
51#[derive(Debug, Clone)]
52pub enum Event {
53    /// A local entry has been added.
54    LocalInsert {
55        /// Document in which the entry was inserted.
56        namespace: NamespaceId,
57        /// Inserted entry.
58        entry: SignedEntry,
59    },
60    /// A remote entry has been added.
61    RemoteInsert {
62        /// Document in which the entry was inserted.
63        namespace: NamespaceId,
64        /// Inserted entry.
65        entry: SignedEntry,
66        /// Peer that provided the inserted entry.
67        from: PeerIdBytes,
68        /// Whether download policies require the content to be downloaded.
69        should_download: bool,
70        /// [`ContentStatus`] for this entry in the remote's replica.
71        remote_content_status: ContentStatus,
72    },
73}
74
75/// Whether an entry was inserted locally or by a remote peer.
76#[derive(Debug, Clone)]
77pub enum InsertOrigin {
78    /// The entry was inserted locally.
79    Local,
80    /// The entry was received from the remote node identified by [`PeerIdBytes`].
81    Sync {
82        /// The peer from which we received this entry.
83        from: PeerIdBytes,
84        /// Whether the peer claims to have the content blob for this entry.
85        remote_content_status: ContentStatus,
86    },
87}
88
89/// Whether the content status is available on a node.
90#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
91pub enum ContentStatus {
92    /// The content is completely available.
93    Complete,
94    /// The content is partially available.
95    Incomplete,
96    /// The content is missing.
97    Missing,
98}
99
100/// Outcome of a sync operation.
101#[derive(Debug, Clone, Default)]
102pub struct SyncOutcome {
103    /// Timestamp of the latest entry for each author in the set we received.
104    pub heads_received: AuthorHeads,
105    /// Number of entries we received.
106    pub num_recv: usize,
107    /// Number of entries we sent.
108    pub num_sent: usize,
109}
110
111#[derive(Debug, Default)]
112struct Subscribers(Vec<flume::Sender<Event>>);
113impl Subscribers {
114    pub fn subscribe(&mut self, sender: flume::Sender<Event>) {
115        self.0.push(sender)
116    }
117    pub fn unsubscribe(&mut self, sender: &flume::Sender<Event>) {
118        self.0.retain(|s| !s.same_channel(sender));
119    }
120    pub fn send(&mut self, event: Event) {
121        self.0.retain(|sender| sender.send(event.clone()).is_ok())
122    }
123    pub fn len(&self) -> usize {
124        self.0.len()
125    }
126    pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
127        if !self.0.is_empty() {
128            self.send(f())
129        }
130    }
131}
132
133/// Kind of capability of the namespace.
134#[derive(
135    Debug,
136    Clone,
137    Copy,
138    Serialize,
139    Deserialize,
140    num_enum::IntoPrimitive,
141    num_enum::TryFromPrimitive,
142    strum::Display,
143)]
144#[repr(u8)]
145#[strum(serialize_all = "snake_case")]
146pub enum CapabilityKind {
147    /// A writable replica.
148    Write = 1,
149    /// A readable replica.
150    Read = 2,
151}
152
153/// The capability of the namespace.
154#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
155pub enum Capability {
156    /// Write access to the namespace.
157    Write(NamespaceSecret),
158    /// Read only access to the namespace.
159    Read(NamespaceId),
160}
161
162impl Capability {
163    /// Get the [`NamespaceId`] for this [`Capability`].
164    pub fn id(&self) -> NamespaceId {
165        match self {
166            Capability::Write(secret) => secret.id(),
167            Capability::Read(id) => *id,
168        }
169    }
170
171    /// Get the [`NamespaceSecret`] of this [`Capability`].
172    /// Will fail if the [`Capability`] is read only.
173    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
174        match self {
175            Capability::Write(secret) => Ok(secret),
176            Capability::Read(_) => Err(ReadOnly),
177        }
178    }
179
180    /// Get the kind of capability.
181    pub fn kind(&self) -> CapabilityKind {
182        match self {
183            Capability::Write(_) => CapabilityKind::Write,
184            Capability::Read(_) => CapabilityKind::Read,
185        }
186    }
187
188    /// Get the raw representation of this namespace capability.
189    pub fn raw(&self) -> (u8, [u8; 32]) {
190        let capability_repr: u8 = self.kind().into();
191        let bytes = match self {
192            Capability::Write(secret) => secret.to_bytes(),
193            Capability::Read(id) => id.to_bytes(),
194        };
195        (capability_repr, bytes)
196    }
197
198    /// Create a [`Capability`] from its raw representation.
199    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
200        let kind: CapabilityKind = kind.try_into()?;
201        let capability = match kind {
202            CapabilityKind::Write => {
203                let secret = NamespaceSecret::from_bytes(bytes);
204                Capability::Write(secret)
205            }
206            CapabilityKind::Read => {
207                let id = NamespaceId::from(bytes);
208                Capability::Read(id)
209            }
210        };
211        Ok(capability)
212    }
213
214    /// Merge this capability with another capability.
215    ///
216    /// Will return an error if `other` is not a capability for the same namespace.
217    ///
218    /// Returns `true` if the capability was changed, `false` otherwise.
219    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
220        if other.id() != self.id() {
221            return Err(CapabilityError::NamespaceMismatch);
222        }
223
224        // the only capability upgrade is from read-only (self) to writable (other)
225        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
226            let _ = std::mem::replace(self, other);
227            Ok(true)
228        } else {
229            Ok(false)
230        }
231    }
232}
233
234/// Errors for capability operations
235#[derive(Debug, thiserror::Error)]
236pub enum CapabilityError {
237    /// Namespaces are not the same
238    #[error("Namespaces are not the same")]
239    NamespaceMismatch,
240}
241
242/// In memory information about an open replica.
243#[derive(derive_more::Debug)]
244pub struct ReplicaInfo {
245    pub(crate) capability: Capability,
246    subscribers: Subscribers,
247    #[debug("ContentStatusCallback")]
248    content_status_cb: Option<ContentStatusCallback>,
249    closed: bool,
250}
251
252impl ReplicaInfo {
253    /// Create a new replica.
254    pub fn new(capability: Capability) -> Self {
255        Self {
256            capability,
257            subscribers: Default::default(),
258            // on_insert_sender: RwLock::new(None),
259            content_status_cb: None,
260            closed: false,
261        }
262    }
263
264    /// Subscribe to insert events.
265    ///
266    /// When subscribing to a replica, you must ensure that the corresponding [`flume::Receiver`] is
267    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
268    /// the receiver to be received from.
269    pub fn subscribe(&mut self, sender: flume::Sender<Event>) {
270        self.subscribers.subscribe(sender)
271    }
272
273    /// Explicitly unsubscribe a sender.
274    ///
275    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
276    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
277    /// this replica without having to drop the receiver.
278    pub fn unsubscribe(&mut self, sender: &flume::Sender<Event>) {
279        self.subscribers.unsubscribe(sender)
280    }
281
282    /// Get the number of current event subscribers.
283    pub fn subscribers_count(&self) -> usize {
284        self.subscribers.len()
285    }
286
287    /// Set the content status callback.
288    ///
289    /// Only one callback can be active at a time. If a previous callback was registered, this
290    /// will return `false`.
291    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
292        if self.content_status_cb.is_some() {
293            false
294        } else {
295            self.content_status_cb = Some(cb);
296            true
297        }
298    }
299
300    fn ensure_open(&self) -> Result<(), InsertError> {
301        if self.closed() {
302            Err(InsertError::Closed)
303        } else {
304            Ok(())
305        }
306    }
307
308    /// Returns true if the replica is closed.
309    ///
310    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
311    /// manually, it must be closed via [`store::Store::close_replica`] or
312    /// [`store::Store::remove_replica`]
313    pub fn closed(&self) -> bool {
314        self.closed
315    }
316
317    /// Merge a capability.
318    ///
319    /// The capability must refer to the the same namespace, otherwise an error will be returned.
320    ///
321    /// This will upgrade the replica's capability when passing a `Capability::Write`.
322    /// It is a no-op if `capability` is a Capability::Read`.
323    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
324        self.capability.merge(capability)
325    }
326}
327
328/// Local representation of a mutable, synchronizable key-value store.
329#[derive(derive_more::Debug)]
330pub struct Replica<'a, I = Box<ReplicaInfo>> {
331    pub(crate) store: StoreInstance<'a>,
332    pub(crate) info: I,
333}
334
335impl<'a, I> Replica<'a, I>
336where
337    I: Deref<Target = ReplicaInfo> + DerefMut,
338{
339    /// Create a new replica.
340    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
341        Replica { info, store }
342    }
343
344    /// Insert a new record at the given key.
345    ///
346    /// The entry will by signed by the provided `author`.
347    /// The `len` must be the byte length of the data identified by `hash`.
348    ///
349    /// Returns the number of entries removed as a consequence of this insertion,
350    /// or an error either if the entry failed to validate or if a store operation failed.
351    pub fn insert(
352        &mut self,
353        key: impl AsRef<[u8]>,
354        author: &Author,
355        hash: Hash,
356        len: u64,
357    ) -> Result<usize, InsertError> {
358        if len == 0 || hash == Hash::EMPTY {
359            return Err(InsertError::EntryIsEmpty);
360        }
361        self.info.ensure_open()?;
362        let id = RecordIdentifier::new(self.id(), author.id(), key);
363        let record = Record::new_current(hash, len);
364        let entry = Entry::new(id, record);
365        let secret = self.secret_key()?;
366        let signed_entry = entry.sign(secret, author);
367        self.insert_entry(signed_entry, InsertOrigin::Local)
368    }
369
370    /// Delete entries that match the given `author` and key `prefix`.
371    ///
372    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
373    /// entries whose key starts with or is equal to the given `prefix`.
374    ///
375    /// Returns the number of entries deleted.
376    pub fn delete_prefix(
377        &mut self,
378        prefix: impl AsRef<[u8]>,
379        author: &Author,
380    ) -> Result<usize, InsertError> {
381        self.info.ensure_open()?;
382        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
383        let entry = Entry::new_empty(id);
384        let signed_entry = entry.sign(self.secret_key()?, author);
385        self.insert_entry(signed_entry, InsertOrigin::Local)
386    }
387
388    /// Insert an entry into this replica which was received from a remote peer.
389    ///
390    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
391    /// event, and insert the entry into the replica store.
392    ///
393    /// Returns the number of entries removed as a consequence of this insertion,
394    /// or an error if the entry failed to validate or if a store operation failed.
395    pub fn insert_remote_entry(
396        &mut self,
397        entry: SignedEntry,
398        received_from: PeerIdBytes,
399        content_status: ContentStatus,
400    ) -> Result<usize, InsertError> {
401        self.info.ensure_open()?;
402        entry.validate_empty()?;
403        let origin = InsertOrigin::Sync {
404            from: received_from,
405            remote_content_status: content_status,
406        };
407        self.insert_entry(entry, origin)
408    }
409
410    /// Insert a signed entry into the database.
411    ///
412    /// Returns the number of entries removed as a consequence of this insertion.
413    fn insert_entry(
414        &mut self,
415        entry: SignedEntry,
416        origin: InsertOrigin,
417    ) -> Result<usize, InsertError> {
418        let namespace = self.id();
419
420        #[cfg(feature = "metrics")]
421        let len = entry.content_len();
422
423        let store = &self.store;
424        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
425
426        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
427
428        let removed_count = match outcome {
429            InsertOutcome::Inserted { removed } => removed,
430            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
431        };
432
433        let insert_event = match origin {
434            InsertOrigin::Local => {
435                #[cfg(feature = "metrics")]
436                {
437                    inc!(Metrics, new_entries_local);
438                    inc_by!(Metrics, new_entries_local_size, len);
439                }
440                Event::LocalInsert { namespace, entry }
441            }
442            InsertOrigin::Sync {
443                from,
444                remote_content_status,
445            } => {
446                #[cfg(feature = "metrics")]
447                {
448                    inc!(Metrics, new_entries_remote);
449                    inc_by!(Metrics, new_entries_remote_size, len);
450                }
451
452                let download_policy = self
453                    .store
454                    .get_download_policy(&self.capability().id())
455                    .unwrap_or_default();
456                let should_download = download_policy.matches(entry.entry());
457                Event::RemoteInsert {
458                    namespace,
459                    entry,
460                    from,
461                    should_download,
462                    remote_content_status,
463                }
464            }
465        };
466
467        self.info.subscribers.send(insert_event);
468
469        Ok(removed_count)
470    }
471
472    /// Hashes the given data and inserts it.
473    ///
474    /// This does not store the content, just the record of it.
475    /// Returns the calculated hash.
476    pub fn hash_and_insert(
477        &mut self,
478        key: impl AsRef<[u8]>,
479        author: &Author,
480        data: impl AsRef<[u8]>,
481    ) -> Result<Hash, InsertError> {
482        self.info.ensure_open()?;
483        let len = data.as_ref().len() as u64;
484        let hash = Hash::new(data);
485        self.insert(key, author, hash, len)?;
486        Ok(hash)
487    }
488
489    /// Get the identifier for an entry in this replica.
490    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
491        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
492    }
493
494    /// Create the initial message for the set reconciliation flow with a remote peer.
495    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
496        self.info.ensure_open().map_err(anyhow::Error::from)?;
497        self.store.initial_message().map_err(Into::into)
498    }
499
500    /// Process a set reconciliation message from a remote peer.
501    ///
502    /// Returns the next message to be sent to the peer, if any.
503    pub fn sync_process_message(
504        &mut self,
505        message: crate::ranger::Message<SignedEntry>,
506        from_peer: PeerIdBytes,
507        state: &mut SyncOutcome,
508    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
509        self.info.ensure_open()?;
510        let my_namespace = self.id();
511        let now = system_time_now();
512
513        // update state with incoming data.
514        state.num_recv += message.value_count();
515        for (entry, _content_status) in message.values() {
516            state
517                .heads_received
518                .insert(entry.author(), entry.timestamp());
519        }
520
521        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
522        // l
523        let cb = self.info.content_status_cb.clone();
524        let download_policy = self
525            .store
526            .get_download_policy(&my_namespace)
527            .unwrap_or_default();
528        let reply = self.store.process_message(
529            &Default::default(),
530            message,
531            // validate callback: validate incoming entries, and send to on_insert channel
532            |store, entry, content_status| {
533                let origin = InsertOrigin::Sync {
534                    from: from_peer,
535                    remote_content_status: content_status,
536                };
537                validate_entry(now, store, my_namespace, entry, &origin).is_ok()
538            },
539            // on_insert callback: is called when an entry was actually inserted in the store
540            |_store, entry, content_status| {
541                // We use `send_with` to only clone the entry if we have active subscriptions.
542                self.info.subscribers.send_with(|| {
543                    let should_download = download_policy.matches(entry.entry());
544                    Event::RemoteInsert {
545                        from: from_peer,
546                        namespace: my_namespace,
547                        entry: entry.clone(),
548                        should_download,
549                        remote_content_status: content_status,
550                    }
551                })
552            },
553            // content_status callback: get content status for outgoing entries
554            |_store, entry| {
555                if let Some(cb) = cb.as_ref() {
556                    cb(entry.content_hash())
557                } else {
558                    ContentStatus::Missing
559                }
560            },
561        )?;
562
563        // update state with outgoing data.
564        if let Some(ref reply) = reply {
565            state.num_sent += reply.value_count();
566        }
567
568        Ok(reply)
569    }
570
571    /// Get the namespace identifier for this [`Replica`].
572    pub fn id(&self) -> NamespaceId {
573        self.info.capability.id()
574    }
575
576    /// Get the [`Capability`] of this [`Replica`].
577    pub fn capability(&self) -> &Capability {
578        &self.info.capability
579    }
580
581    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
582    /// the replica is read only
583    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
584        self.info.capability.secret_key()
585    }
586}
587
588/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
589#[derive(Debug, thiserror::Error)]
590#[error("Replica allows read access only.")]
591pub struct ReadOnly;
592
593/// Validate a [`SignedEntry`] if it's fit to be inserted.
594///
595/// This validates that
596/// * the entry's author and namespace signatures are correct
597/// * the entry's namespace matches the current replica
598/// * the entry's timestamp is not more than 10 minutes in the future of our system time
599/// * the entry is newer than an existing entry for the same key and author, if such exists.
600fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
601    now: u64,
602    store: &S,
603    expected_namespace: NamespaceId,
604    entry: &SignedEntry,
605    origin: &InsertOrigin,
606) -> Result<(), ValidationFailure> {
607    // Verify the namespace
608    if entry.namespace() != expected_namespace {
609        return Err(ValidationFailure::InvalidNamespace);
610    }
611
612    // Verify signature for non-local entries.
613    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
614        return Err(ValidationFailure::BadSignature);
615    }
616
617    // Verify that the timestamp of the entry is not too far in the future.
618    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
619        return Err(ValidationFailure::TooFarInTheFuture);
620    }
621    Ok(())
622}
623
624/// Error emitted when inserting entries into a [`Replica`] failed
625#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
626pub enum InsertError {
627    /// Storage error
628    #[error("storage error")]
629    Store(anyhow::Error),
630    /// Validation failure
631    #[error("validation failure")]
632    Validation(#[from] ValidationFailure),
633    /// A newer entry exists for either this entry's key or a prefix of the key.
634    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
635    NewerEntryExists,
636    /// Attempted to insert an empty entry.
637    #[error("Attempted to insert an empty entry")]
638    EntryIsEmpty,
639    /// Replica is read only.
640    #[error("Attempted to insert to read only replica")]
641    #[from(ReadOnly)]
642    ReadOnly,
643    /// The replica is closed, no operations may be performed.
644    #[error("replica is closed")]
645    Closed,
646}
647
648/// Reason why entry validation failed
649#[derive(thiserror::Error, Debug)]
650pub enum ValidationFailure {
651    /// Entry namespace does not match the current replica.
652    #[error("Entry namespace does not match the current replica")]
653    InvalidNamespace,
654    /// Entry signature is invalid.
655    #[error("Entry signature is invalid")]
656    BadSignature,
657    /// Entry timestamp is too far in the future.
658    #[error("Entry timestamp is too far in the future.")]
659    TooFarInTheFuture,
660    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
661    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
662    InvalidEmptyEntry,
663}
664
665/// A signed entry.
666#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
667pub struct SignedEntry {
668    signature: EntrySignature,
669    entry: Entry,
670}
671
672impl From<SignedEntry> for Entry {
673    fn from(value: SignedEntry) -> Self {
674        value.entry
675    }
676}
677
678impl PartialOrd for SignedEntry {
679    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680        Some(self.cmp(other))
681    }
682}
683
684impl Ord for SignedEntry {
685    fn cmp(&self, other: &Self) -> Ordering {
686        self.entry.cmp(&other.entry)
687    }
688}
689
690impl PartialOrd for Entry {
691    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
692        Some(self.cmp(other))
693    }
694}
695
696impl Ord for Entry {
697    fn cmp(&self, other: &Self) -> Ordering {
698        self.id
699            .cmp(&other.id)
700            .then_with(|| self.record.cmp(&other.record))
701    }
702}
703
704impl SignedEntry {
705    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
706        SignedEntry { signature, entry }
707    }
708
709    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
710    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
711        let signature = EntrySignature::from_entry(&entry, namespace, author);
712        SignedEntry { signature, entry }
713    }
714
715    /// Create a new signed entries from its parts.
716    pub fn from_parts(
717        namespace: &NamespaceSecret,
718        author: &Author,
719        key: impl AsRef<[u8]>,
720        record: Record,
721    ) -> Self {
722        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
723        let entry = Entry::new(id, record);
724        Self::from_entry(entry, namespace, author)
725    }
726
727    /// Verify the signatures on this entry.
728    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
729        self.signature.verify(
730            &self.entry,
731            &self.entry.namespace().public_key(store)?,
732            &self.entry.author().public_key(store)?,
733        )
734    }
735
736    /// Get the signature.
737    pub fn signature(&self) -> &EntrySignature {
738        &self.signature
739    }
740
741    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
742    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
743        self.entry().validate_empty()
744    }
745
746    /// Get the [`Entry`].
747    pub fn entry(&self) -> &Entry {
748        &self.entry
749    }
750
751    /// Get the content [`struct@Hash`] of the entry.
752    pub fn content_hash(&self) -> Hash {
753        self.entry().content_hash()
754    }
755
756    /// Get the content length of the entry.
757    pub fn content_len(&self) -> u64 {
758        self.entry().content_len()
759    }
760
761    /// Get the author bytes of this entry.
762    pub fn author_bytes(&self) -> AuthorId {
763        self.entry().id().author()
764    }
765
766    /// Get the key of the entry.
767    pub fn key(&self) -> &[u8] {
768        self.entry().id().key()
769    }
770
771    /// Get the timestamp of the entry.
772    pub fn timestamp(&self) -> u64 {
773        self.entry().timestamp()
774    }
775}
776
777impl RangeEntry for SignedEntry {
778    type Key = RecordIdentifier;
779    type Value = Record;
780
781    fn key(&self) -> &Self::Key {
782        &self.entry.id
783    }
784
785    fn value(&self) -> &Self::Value {
786        &self.entry.record
787    }
788
789    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
790        let mut hasher = blake3::Hasher::new();
791        hasher.update(self.namespace().as_ref());
792        hasher.update(self.author_bytes().as_ref());
793        hasher.update(self.key());
794        hasher.update(&self.timestamp().to_be_bytes());
795        hasher.update(self.content_hash().as_bytes());
796        Fingerprint(hasher.finalize().into())
797    }
798}
799
800/// Signature over an entry.
801#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
802pub struct EntrySignature {
803    author_signature: Signature,
804    namespace_signature: Signature,
805}
806
807impl Debug for EntrySignature {
808    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809        f.debug_struct("EntrySignature")
810            .field(
811                "namespace_signature",
812                &base32::fmt(self.namespace_signature.to_bytes()),
813            )
814            .field(
815                "author_signature",
816                &base32::fmt(self.author_signature.to_bytes()),
817            )
818            .finish()
819    }
820}
821
822impl EntrySignature {
823    /// Create a new signature by signing an entry with the `namespace` and `author`.
824    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
825        // TODO: this should probably include a namespace prefix
826        // namespace in the cryptographic sense.
827        let bytes = entry.to_vec();
828        let namespace_signature = namespace.sign(&bytes);
829        let author_signature = author.sign(&bytes);
830
831        EntrySignature {
832            author_signature,
833            namespace_signature,
834        }
835    }
836
837    /// Verify that this signature was created by signing the `entry` with the
838    /// secret keys of the specified `author` and `namespace`.
839    pub fn verify(
840        &self,
841        entry: &Entry,
842        namespace: &NamespacePublicKey,
843        author: &AuthorPublicKey,
844    ) -> Result<(), SignatureError> {
845        let bytes = entry.to_vec();
846        namespace.verify(&bytes, &self.namespace_signature)?;
847        author.verify(&bytes, &self.author_signature)?;
848
849        Ok(())
850    }
851
852    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
853        let namespace_signature = Signature::from_bytes(namespace_sig);
854        let author_signature = Signature::from_bytes(author_sig);
855
856        EntrySignature {
857            author_signature,
858            namespace_signature,
859        }
860    }
861
862    pub(crate) fn author(&self) -> &Signature {
863        &self.author_signature
864    }
865
866    pub(crate) fn namespace(&self) -> &Signature {
867        &self.namespace_signature
868    }
869}
870
871/// A single entry in a [`Replica`]
872///
873/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
874/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_base::hash::Hash)
875/// of the entry's content data, the size of this content data, and a timestamp.
876#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
877pub struct Entry {
878    id: RecordIdentifier,
879    record: Record,
880}
881
882impl Entry {
883    /// Create a new entry
884    pub fn new(id: RecordIdentifier, record: Record) -> Self {
885        Entry { id, record }
886    }
887
888    /// Create a new empty entry with the current timestamp.
889    pub fn new_empty(id: RecordIdentifier) -> Self {
890        Entry {
891            id,
892            record: Record::empty_current(),
893        }
894    }
895
896    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
897    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
898        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
899            (true, true) => Ok(()),
900            (false, false) => Ok(()),
901            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
902            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
903        }
904    }
905
906    /// Get the [`RecordIdentifier`] for this entry.
907    pub fn id(&self) -> &RecordIdentifier {
908        &self.id
909    }
910
911    /// Get the [`NamespaceId`] of this entry.
912    pub fn namespace(&self) -> NamespaceId {
913        self.id.namespace()
914    }
915
916    /// Get the [`AuthorId`] of this entry.
917    pub fn author(&self) -> AuthorId {
918        self.id.author()
919    }
920
921    /// Get the key of this entry.
922    pub fn key(&self) -> &[u8] {
923        self.id.key()
924    }
925
926    /// Get the [`Record`] contained in this entry.
927    pub fn record(&self) -> &Record {
928        &self.record
929    }
930
931    /// Serialize this entry into its canonical byte representation used for signing.
932    pub fn encode(&self, out: &mut Vec<u8>) {
933        self.id.encode(out);
934        self.record.encode(out);
935    }
936
937    /// Serialize this entry into a new vector with its canonical byte representation.
938    pub fn to_vec(&self) -> Vec<u8> {
939        let mut out = Vec::new();
940        self.encode(&mut out);
941        out
942    }
943
944    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
945    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
946        SignedEntry::from_entry(self, namespace, author)
947    }
948}
949
950const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
951const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
952const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
953
954/// The identifier of a record.
955#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
956pub struct RecordIdentifier(Bytes);
957
958impl Default for RecordIdentifier {
959    fn default() -> Self {
960        Self::new(NamespaceId::default(), AuthorId::default(), b"")
961    }
962}
963
964impl Debug for RecordIdentifier {
965    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
966        f.debug_struct("RecordIdentifier")
967            .field("namespace", &self.namespace())
968            .field("author", &self.author())
969            .field("key", &std::string::String::from_utf8_lossy(self.key()))
970            .finish()
971    }
972}
973
974impl RangeKey for RecordIdentifier {
975    #[cfg(test)]
976    fn is_prefix_of(&self, other: &Self) -> bool {
977        other.as_ref().starts_with(self.as_ref())
978    }
979}
980
981fn system_time_now() -> u64 {
982    SystemTime::now()
983        .duration_since(SystemTime::UNIX_EPOCH)
984        .expect("time drift")
985        .as_micros() as u64
986}
987
988impl RecordIdentifier {
989    /// Create a new [`RecordIdentifier`].
990    pub fn new(
991        namespace: impl Into<NamespaceId>,
992        author: impl Into<AuthorId>,
993        key: impl AsRef<[u8]>,
994    ) -> Self {
995        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
996        bytes.extend_from_slice(namespace.into().as_bytes());
997        bytes.extend_from_slice(author.into().as_bytes());
998        bytes.extend_from_slice(key.as_ref());
999        Self(bytes.freeze())
1000    }
1001
1002    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1003    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1004        out.extend_from_slice(&self.0);
1005    }
1006
1007    /// Get this [`RecordIdentifier`] as [Bytes].
1008    pub fn as_bytes(&self) -> Bytes {
1009        self.0.clone()
1010    }
1011
1012    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1013    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1014        (
1015            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1016            self.0[AUTHOR_BYTES].try_into().unwrap(),
1017            &self.0[KEY_BYTES],
1018        )
1019    }
1020
1021    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1022    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1023        (
1024            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1025            self.0[AUTHOR_BYTES].try_into().unwrap(),
1026            self.0.slice(KEY_BYTES),
1027        )
1028    }
1029
1030    /// Get the key of this record.
1031    pub fn key(&self) -> &[u8] {
1032        &self.0[KEY_BYTES]
1033    }
1034
1035    /// Get the key of this record as [`Bytes`].
1036    pub fn key_bytes(&self) -> Bytes {
1037        self.0.slice(KEY_BYTES)
1038    }
1039
1040    /// Get the [`NamespaceId`] of this record as byte array.
1041    pub fn namespace(&self) -> NamespaceId {
1042        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1043        value.into()
1044    }
1045
1046    /// Get the [`AuthorId`] of this record as byte array.
1047    pub fn author(&self) -> AuthorId {
1048        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1049        value.into()
1050    }
1051}
1052
1053impl AsRef<[u8]> for RecordIdentifier {
1054    fn as_ref(&self) -> &[u8] {
1055        &self.0
1056    }
1057}
1058
1059impl Deref for SignedEntry {
1060    type Target = Entry;
1061    fn deref(&self) -> &Self::Target {
1062        &self.entry
1063    }
1064}
1065
1066impl Deref for Entry {
1067    type Target = Record;
1068    fn deref(&self) -> &Self::Target {
1069        &self.record
1070    }
1071}
1072
1073/// The data part of an entry in a [`Replica`].
1074#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1075pub struct Record {
1076    /// Length of the data referenced by `hash`.
1077    len: u64,
1078    /// Hash of the content data.
1079    hash: Hash,
1080    /// Record creation timestamp. Counted as micros since the Unix epoch.
1081    timestamp: u64,
1082}
1083
1084impl RangeValue for Record {}
1085
1086/// Ordering for entry values.
1087///
1088/// Compares first the timestamp, then the content hash.
1089impl Ord for Record {
1090    fn cmp(&self, other: &Self) -> Ordering {
1091        self.timestamp
1092            .cmp(&other.timestamp)
1093            .then_with(|| self.hash.cmp(&other.hash))
1094    }
1095}
1096
1097impl PartialOrd for Record {
1098    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1099        Some(self.cmp(other))
1100    }
1101}
1102
1103impl Record {
1104    /// Create a new record.
1105    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1106        debug_assert!(
1107            len != 0 || hash == Hash::EMPTY,
1108            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1109        );
1110        Record {
1111            hash,
1112            len,
1113            timestamp,
1114        }
1115    }
1116
1117    /// Create a tombstone record (empty content)
1118    pub fn empty(timestamp: u64) -> Self {
1119        Self::new(Hash::EMPTY, 0, timestamp)
1120    }
1121
1122    /// Create a tombstone record with the timestamp set to now.
1123    pub fn empty_current() -> Self {
1124        Self::new_current(Hash::EMPTY, 0)
1125    }
1126
1127    /// Return `true` if the entry is empty.
1128    pub fn is_empty(&self) -> bool {
1129        self.hash == Hash::EMPTY
1130    }
1131
1132    /// Create a new [`Record`] with the timestamp set to now.
1133    pub fn new_current(hash: Hash, len: u64) -> Self {
1134        let timestamp = system_time_now();
1135        Self::new(hash, len, timestamp)
1136    }
1137
1138    /// Get the length of the data addressed by this record's content hash.
1139    pub fn content_len(&self) -> u64 {
1140        self.len
1141    }
1142
1143    /// Get the [`struct@Hash`] of the content data of this record.
1144    pub fn content_hash(&self) -> Hash {
1145        self.hash
1146    }
1147
1148    /// Get the timestamp of this record.
1149    pub fn timestamp(&self) -> u64 {
1150        self.timestamp
1151    }
1152
1153    #[cfg(test)]
1154    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1155        let len = data.as_ref().len() as u64;
1156        let hash = Hash::new(data);
1157        Self::new_current(hash, len)
1158    }
1159
1160    #[cfg(test)]
1161    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1162        let len = data.as_ref().len() as u64;
1163        let hash = Hash::new(data);
1164        Self::new(hash, len, timestamp)
1165    }
1166
1167    /// Serialize this record into a mutable byte array.
1168    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1169        out.extend_from_slice(&self.len.to_be_bytes());
1170        out.extend_from_slice(self.hash.as_ref());
1171        out.extend_from_slice(&self.timestamp.to_be_bytes())
1172    }
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177    use std::collections::HashSet;
1178
1179    use anyhow::Result;
1180    use rand_core::SeedableRng;
1181
1182    use crate::{
1183        actor::SyncHandle,
1184        ranger::{Range, Store as _},
1185        store::{OpenError, Query, SortBy, SortDirection, Store},
1186    };
1187
1188    use super::*;
1189
1190    #[test]
1191    fn test_basics_memory() -> Result<()> {
1192        let store = store::Store::memory();
1193        test_basics(store)?;
1194
1195        Ok(())
1196    }
1197
1198    #[test]
1199    fn test_basics_fs() -> Result<()> {
1200        let dbfile = tempfile::NamedTempFile::new()?;
1201        let store = store::fs::Store::persistent(dbfile.path())?;
1202        test_basics(store)?;
1203        Ok(())
1204    }
1205
1206    fn test_basics(mut store: Store) -> Result<()> {
1207        let mut rng = rand::thread_rng();
1208        let alice = Author::new(&mut rng);
1209        let bob = Author::new(&mut rng);
1210        let myspace = NamespaceSecret::new(&mut rng);
1211
1212        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1213        let record = Record::current_from_data(b"this is my cool data");
1214        let entry = Entry::new(record_id, record);
1215        let signed_entry = entry.sign(&myspace, &alice);
1216        signed_entry.verify(&()).expect("failed to verify");
1217
1218        let mut my_replica = store.new_replica(myspace.clone())?;
1219        for i in 0..10 {
1220            my_replica.hash_and_insert(
1221                format!("/{i}"),
1222                &alice,
1223                format!("{i}: hello from alice"),
1224            )?;
1225        }
1226
1227        for i in 0..10 {
1228            let res = store
1229                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1230                .unwrap();
1231            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1232            assert_eq!(res.entry().record().content_len(), len);
1233            res.verify(&())?;
1234        }
1235
1236        // Test multiple records for the same key
1237        let mut my_replica = store.new_replica(myspace.clone())?;
1238        my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
1239        let _entry = store
1240            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1241            .unwrap();
1242        // Second
1243        let mut my_replica = store.new_replica(myspace.clone())?;
1244        my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
1245        let _entry = store
1246            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1247            .unwrap();
1248
1249        // Get All by author
1250        let entries: Vec<_> = store
1251            .get_many(myspace.id(), Query::author(alice.id()))?
1252            .collect::<Result<_>>()?;
1253        assert_eq!(entries.len(), 11);
1254
1255        // Get All by author
1256        let entries: Vec<_> = store
1257            .get_many(myspace.id(), Query::author(bob.id()))?
1258            .collect::<Result<_>>()?;
1259        assert_eq!(entries.len(), 0);
1260
1261        // Get All by key
1262        let entries: Vec<_> = store
1263            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1264            .collect::<Result<_>>()?;
1265        assert_eq!(entries.len(), 1);
1266
1267        // Get All
1268        let entries: Vec<_> = store
1269            .get_many(myspace.id(), Query::all())?
1270            .collect::<Result<_>>()?;
1271        assert_eq!(entries.len(), 11);
1272
1273        // insert record from different author
1274        let mut my_replica = store.new_replica(myspace.clone())?;
1275        let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
1276
1277        // Get All by author
1278        let entries: Vec<_> = store
1279            .get_many(myspace.id(), Query::author(alice.id()))?
1280            .collect::<Result<_>>()?;
1281        assert_eq!(entries.len(), 11);
1282
1283        let entries: Vec<_> = store
1284            .get_many(myspace.id(), Query::author(bob.id()))?
1285            .collect::<Result<_>>()?;
1286        assert_eq!(entries.len(), 1);
1287
1288        // Get All by key
1289        let entries: Vec<_> = store
1290            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1291            .collect::<Result<_>>()?;
1292        assert_eq!(entries.len(), 2);
1293
1294        // Get all by prefix
1295        let entries: Vec<_> = store
1296            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1297            .collect::<Result<_>>()?;
1298        assert_eq!(entries.len(), 2);
1299
1300        // Get All by author and prefix
1301        let entries: Vec<_> = store
1302            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1303            .collect::<Result<_>>()?;
1304        assert_eq!(entries.len(), 1);
1305
1306        let entries: Vec<_> = store
1307            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1308            .collect::<Result<_>>()?;
1309        assert_eq!(entries.len(), 1);
1310
1311        // Get All
1312        let entries: Vec<_> = store
1313            .get_many(myspace.id(), Query::all())?
1314            .collect::<Result<_>>()?;
1315        assert_eq!(entries.len(), 12);
1316
1317        // Get Range of all should return all latest
1318        let mut my_replica = store.new_replica(myspace.clone())?;
1319        let entries_second: Vec<_> = my_replica
1320            .store
1321            .get_range(Range::new(
1322                RecordIdentifier::default(),
1323                RecordIdentifier::default(),
1324            ))?
1325            .collect::<Result<_, _>>()?;
1326
1327        assert_eq!(entries_second.len(), 12);
1328        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1329
1330        test_lru_cache_like_behaviour(&mut store, myspace.id())
1331    }
1332
1333    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1334    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1335    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1336        /// Helper to verify the store returns the expected peers for the namespace.
1337        #[track_caller]
1338        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1339            assert_eq!(
1340                expected_peers,
1341                &store
1342                    .get_sync_peers(&namespace)
1343                    .unwrap()
1344                    .unwrap()
1345                    .collect::<Vec<_>>(),
1346                "sync peers differ"
1347            );
1348        }
1349
1350        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1351        // expected peers: newest peers are to the front, oldest to the back
1352        let mut expected_peers = Vec::with_capacity(count);
1353        for i in 0..count as u8 {
1354            let peer = [i; 32];
1355            expected_peers.insert(0, peer);
1356            store.register_useful_peer(namespace, peer)?;
1357        }
1358        verify_peers(store, namespace, &expected_peers);
1359
1360        // one more peer should evict the last peer
1361        expected_peers.pop();
1362        let newer_peer = [count as u8; 32];
1363        expected_peers.insert(0, newer_peer);
1364        store.register_useful_peer(namespace, newer_peer)?;
1365        verify_peers(store, namespace, &expected_peers);
1366
1367        // move one existing peer up
1368        let refreshed_peer = expected_peers.remove(2);
1369        expected_peers.insert(0, refreshed_peer);
1370        store.register_useful_peer(namespace, refreshed_peer)?;
1371        verify_peers(store, namespace, &expected_peers);
1372        Ok(())
1373    }
1374
1375    #[test]
1376    fn test_content_hashes_iterator_memory() -> Result<()> {
1377        let store = store::Store::memory();
1378        test_content_hashes_iterator(store)
1379    }
1380
1381    #[test]
1382    fn test_content_hashes_iterator_fs() -> Result<()> {
1383        let dbfile = tempfile::NamedTempFile::new()?;
1384        let store = store::fs::Store::persistent(dbfile.path())?;
1385        test_content_hashes_iterator(store)
1386    }
1387
1388    fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1389        let mut rng = rand::thread_rng();
1390        let mut expected = HashSet::new();
1391        let n_replicas = 3;
1392        let n_entries = 4;
1393        for i in 0..n_replicas {
1394            let namespace = NamespaceSecret::new(&mut rng);
1395            let author = store.new_author(&mut rng)?;
1396            let mut replica = store.new_replica(namespace)?;
1397            for j in 0..n_entries {
1398                let key = format!("{j}");
1399                let data = format!("{i}:{j}");
1400                let hash = replica.hash_and_insert(key, &author, data)?;
1401                expected.insert(hash);
1402            }
1403        }
1404        assert_eq!(expected.len(), n_replicas * n_entries);
1405        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1406        assert_eq!(actual, expected);
1407        Ok(())
1408    }
1409
1410    #[test]
1411    fn test_multikey() {
1412        let mut rng = rand::thread_rng();
1413
1414        let k = ["a", "c", "z"];
1415
1416        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1417        n.sort_by_key(|n| n.id());
1418
1419        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1420        a.sort_by_key(|a| a.id());
1421
1422        // Just key
1423        {
1424            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1425            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1426            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1427
1428            let range = Range::new(ri0.clone(), ri2.clone());
1429            assert!(range.contains(&ri0), "start");
1430            assert!(range.contains(&ri1), "inside");
1431            assert!(!range.contains(&ri2), "end");
1432
1433            assert!(ri0 < ri1);
1434            assert!(ri1 < ri2);
1435        }
1436
1437        // Just namespace
1438        {
1439            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1440            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1441            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1442
1443            let range = Range::new(ri0.clone(), ri2.clone());
1444            assert!(range.contains(&ri0), "start");
1445            assert!(range.contains(&ri1), "inside");
1446            assert!(!range.contains(&ri2), "end");
1447
1448            assert!(ri0 < ri1);
1449            assert!(ri1 < ri2);
1450        }
1451
1452        // Just author
1453        {
1454            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1455            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1456            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1457
1458            let range = Range::new(ri0.clone(), ri2.clone());
1459            assert!(range.contains(&ri0), "start");
1460            assert!(range.contains(&ri1), "inside");
1461            assert!(!range.contains(&ri2), "end");
1462
1463            assert!(ri0 < ri1);
1464            assert!(ri1 < ri2);
1465        }
1466
1467        // Just key and namespace
1468        {
1469            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1470            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1471            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1472
1473            let range = Range::new(ri0.clone(), ri2.clone());
1474            assert!(range.contains(&ri0), "start");
1475            assert!(range.contains(&ri1), "inside");
1476            assert!(!range.contains(&ri2), "end");
1477
1478            assert!(ri0 < ri1);
1479            assert!(ri1 < ri2);
1480        }
1481
1482        // Mixed
1483        {
1484            // Ord should prioritize namespace - author - key
1485
1486            let a0 = a[0].id();
1487            let a1 = a[1].id();
1488            let n0 = n[0].id();
1489            let n1 = n[1].id();
1490            let k0 = k[0];
1491            let k1 = k[1];
1492
1493            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1494            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1495            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1496            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1497        }
1498    }
1499
1500    #[test]
1501    fn test_timestamps_memory() -> Result<()> {
1502        let store = store::Store::memory();
1503        test_timestamps(store)?;
1504
1505        Ok(())
1506    }
1507
1508    #[test]
1509    fn test_timestamps_fs() -> Result<()> {
1510        let dbfile = tempfile::NamedTempFile::new()?;
1511        let store = store::fs::Store::persistent(dbfile.path())?;
1512        test_timestamps(store)?;
1513        Ok(())
1514    }
1515
1516    fn test_timestamps(mut store: Store) -> Result<()> {
1517        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1518        let namespace = NamespaceSecret::new(&mut rng);
1519        let _replica = store.new_replica(namespace.clone())?;
1520        let author = store.new_author(&mut rng)?;
1521        store.close_replica(namespace.id());
1522        let mut replica = store.open_replica(&namespace.id())?;
1523
1524        let key = b"hello";
1525        let value = b"world";
1526        let entry = {
1527            let timestamp = 2;
1528            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1529            let record = Record::from_data(value, timestamp);
1530            Entry::new(id, record).sign(&namespace, &author)
1531        };
1532
1533        replica
1534            .insert_entry(entry.clone(), InsertOrigin::Local)
1535            .unwrap();
1536        store.close_replica(namespace.id());
1537        let res = store
1538            .get_exact(namespace.id(), author.id(), key, false)?
1539            .unwrap();
1540        assert_eq!(res, entry);
1541
1542        let entry2 = {
1543            let timestamp = 1;
1544            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1545            let record = Record::from_data(value, timestamp);
1546            Entry::new(id, record).sign(&namespace, &author)
1547        };
1548
1549        let mut replica = store.open_replica(&namespace.id())?;
1550        let res = replica.insert_entry(entry2, InsertOrigin::Local);
1551        store.close_replica(namespace.id());
1552        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1553        let res = store
1554            .get_exact(namespace.id(), author.id(), key, false)?
1555            .unwrap();
1556        assert_eq!(res, entry);
1557
1558        Ok(())
1559    }
1560
1561    #[test]
1562    fn test_replica_sync_memory() -> Result<()> {
1563        let alice_store = store::Store::memory();
1564        let bob_store = store::Store::memory();
1565
1566        test_replica_sync(alice_store, bob_store)?;
1567        Ok(())
1568    }
1569
1570    #[test]
1571    fn test_replica_sync_fs() -> Result<()> {
1572        let alice_dbfile = tempfile::NamedTempFile::new()?;
1573        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1574        let bob_dbfile = tempfile::NamedTempFile::new()?;
1575        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1576        test_replica_sync(alice_store, bob_store)?;
1577
1578        Ok(())
1579    }
1580
1581    fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1582        let alice_set = ["ape", "eel", "fox", "gnu"];
1583        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1584
1585        let mut rng = rand::thread_rng();
1586        let author = Author::new(&mut rng);
1587        let myspace = NamespaceSecret::new(&mut rng);
1588        let mut alice = alice_store.new_replica(myspace.clone())?;
1589        for el in &alice_set {
1590            alice.hash_and_insert(el, &author, el.as_bytes())?;
1591        }
1592
1593        let mut bob = bob_store.new_replica(myspace.clone())?;
1594        for el in &bob_set {
1595            bob.hash_and_insert(el, &author, el.as_bytes())?;
1596        }
1597
1598        let (alice_out, bob_out) = sync(&mut alice, &mut bob)?;
1599
1600        assert_eq!(alice_out.num_sent, 2);
1601        assert_eq!(bob_out.num_recv, 2);
1602        assert_eq!(alice_out.num_recv, 6);
1603        assert_eq!(bob_out.num_sent, 6);
1604
1605        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1606        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1607        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1608        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1609
1610        Ok(())
1611    }
1612
1613    #[test]
1614    fn test_replica_timestamp_sync_memory() -> Result<()> {
1615        let alice_store = store::Store::memory();
1616        let bob_store = store::Store::memory();
1617
1618        test_replica_timestamp_sync(alice_store, bob_store)?;
1619        Ok(())
1620    }
1621
1622    #[test]
1623    fn test_replica_timestamp_sync_fs() -> Result<()> {
1624        let alice_dbfile = tempfile::NamedTempFile::new()?;
1625        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1626        let bob_dbfile = tempfile::NamedTempFile::new()?;
1627        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1628        test_replica_timestamp_sync(alice_store, bob_store)?;
1629
1630        Ok(())
1631    }
1632
1633    fn test_replica_timestamp_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1634        let mut rng = rand::thread_rng();
1635        let author = Author::new(&mut rng);
1636        let namespace = NamespaceSecret::new(&mut rng);
1637        let mut alice = alice_store.new_replica(namespace.clone())?;
1638        let mut bob = bob_store.new_replica(namespace.clone())?;
1639
1640        let key = b"key";
1641        let alice_value = b"alice";
1642        let bob_value = b"bob";
1643        let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
1644        // system time increased - sync should overwrite
1645        let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
1646        sync(&mut alice, &mut bob)?;
1647        assert_eq!(
1648            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1649            Some(bob_hash)
1650        );
1651        assert_eq!(
1652            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1653            Some(bob_hash)
1654        );
1655
1656        let mut alice = alice_store.new_replica(namespace.clone())?;
1657        let mut bob = bob_store.new_replica(namespace.clone())?;
1658
1659        let alice_value_2 = b"alice2";
1660        // system time increased - sync should overwrite
1661        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
1662        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
1663        sync(&mut alice, &mut bob)?;
1664        assert_eq!(
1665            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1666            Some(alice_hash_2)
1667        );
1668        assert_eq!(
1669            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1670            Some(alice_hash_2)
1671        );
1672
1673        Ok(())
1674    }
1675
1676    #[test]
1677    fn test_future_timestamp() -> Result<()> {
1678        let mut rng = rand::thread_rng();
1679        let mut store = store::Store::memory();
1680        let author = Author::new(&mut rng);
1681        let namespace = NamespaceSecret::new(&mut rng);
1682
1683        let mut replica = store.new_replica(namespace.clone())?;
1684        let key = b"hi";
1685        let t = system_time_now();
1686        let record = Record::from_data(b"1", t);
1687        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1688        replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
1689
1690        assert_eq!(
1691            get_entry(&mut store, namespace.id(), author.id(), key)?,
1692            entry0
1693        );
1694
1695        let mut replica = store.new_replica(namespace.clone())?;
1696        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1697        let record = Record::from_data(b"2", t);
1698        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1699        replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
1700        assert_eq!(
1701            get_entry(&mut store, namespace.id(), author.id(), key)?,
1702            entry1
1703        );
1704
1705        let mut replica = store.new_replica(namespace.clone())?;
1706        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1707        let record = Record::from_data(b"2", t);
1708        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1709        replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
1710        assert_eq!(
1711            get_entry(&mut store, namespace.id(), author.id(), key)?,
1712            entry2
1713        );
1714
1715        let mut replica = store.new_replica(namespace.clone())?;
1716        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1717        let record = Record::from_data(b"2", t);
1718        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1719        let res = replica.insert_entry(entry3, InsertOrigin::Local);
1720        assert!(matches!(
1721            res,
1722            Err(InsertError::Validation(
1723                ValidationFailure::TooFarInTheFuture
1724            ))
1725        ));
1726        assert_eq!(
1727            get_entry(&mut store, namespace.id(), author.id(), key)?,
1728            entry2
1729        );
1730
1731        Ok(())
1732    }
1733
1734    #[test]
1735    fn test_insert_empty() -> Result<()> {
1736        let mut store = store::Store::memory();
1737        let mut rng = rand::thread_rng();
1738        let alice = Author::new(&mut rng);
1739        let myspace = NamespaceSecret::new(&mut rng);
1740        let mut replica = store.new_replica(myspace.clone())?;
1741        let hash = Hash::new(b"");
1742        let res = replica.insert(b"foo", &alice, hash, 0);
1743        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1744        Ok(())
1745    }
1746
1747    #[test]
1748    fn test_prefix_delete_memory() -> Result<()> {
1749        let store = store::Store::memory();
1750        test_prefix_delete(store)?;
1751        Ok(())
1752    }
1753
1754    #[test]
1755    fn test_prefix_delete_fs() -> Result<()> {
1756        let dbfile = tempfile::NamedTempFile::new()?;
1757        let store = store::fs::Store::persistent(dbfile.path())?;
1758        test_prefix_delete(store)?;
1759        Ok(())
1760    }
1761
1762    fn test_prefix_delete(mut store: Store) -> Result<()> {
1763        let mut rng = rand::thread_rng();
1764        let alice = Author::new(&mut rng);
1765        let myspace = NamespaceSecret::new(&mut rng);
1766        let mut replica = store.new_replica(myspace.clone())?;
1767        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
1768        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
1769
1770        // sanity checks
1771        assert_eq!(
1772            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1773            Some(hash1)
1774        );
1775        assert_eq!(
1776            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1777            Some(hash2)
1778        );
1779
1780        // delete
1781        let mut replica = store.new_replica(myspace.clone())?;
1782        let deleted = replica.delete_prefix(b"foo", &alice)?;
1783        assert_eq!(deleted, 2);
1784        assert_eq!(
1785            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1786            None
1787        );
1788        assert_eq!(
1789            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1790            None
1791        );
1792        assert_eq!(
1793            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1794            None
1795        );
1796
1797        Ok(())
1798    }
1799
1800    #[test]
1801    fn test_replica_sync_delete_memory() -> Result<()> {
1802        let alice_store = store::Store::memory();
1803        let bob_store = store::Store::memory();
1804
1805        test_replica_sync_delete(alice_store, bob_store)
1806    }
1807
1808    #[test]
1809    fn test_replica_sync_delete_fs() -> Result<()> {
1810        let alice_dbfile = tempfile::NamedTempFile::new()?;
1811        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1812        let bob_dbfile = tempfile::NamedTempFile::new()?;
1813        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1814        test_replica_sync_delete(alice_store, bob_store)
1815    }
1816
1817    fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1818        let alice_set = ["foot"];
1819        let bob_set = ["fool", "foo", "fog"];
1820
1821        let mut rng = rand::thread_rng();
1822        let author = Author::new(&mut rng);
1823        let myspace = NamespaceSecret::new(&mut rng);
1824        let mut alice = alice_store.new_replica(myspace.clone())?;
1825        for el in &alice_set {
1826            alice.hash_and_insert(el, &author, el.as_bytes())?;
1827        }
1828
1829        let mut bob = bob_store.new_replica(myspace.clone())?;
1830        for el in &bob_set {
1831            bob.hash_and_insert(el, &author, el.as_bytes())?;
1832        }
1833
1834        sync(&mut alice, &mut bob)?;
1835
1836        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1837        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1838        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1839        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1840
1841        let mut alice = alice_store.new_replica(myspace.clone())?;
1842        let mut bob = bob_store.new_replica(myspace.clone())?;
1843        alice.delete_prefix("foo", &author)?;
1844        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
1845        sync(&mut alice, &mut bob)?;
1846        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1847        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1848
1849        Ok(())
1850    }
1851
1852    #[test]
1853    fn test_replica_remove_memory() -> Result<()> {
1854        let alice_store = store::Store::memory();
1855        test_replica_remove(alice_store)
1856    }
1857
1858    #[test]
1859    fn test_replica_remove_fs() -> Result<()> {
1860        let alice_dbfile = tempfile::NamedTempFile::new()?;
1861        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1862        test_replica_remove(alice_store)
1863    }
1864
1865    fn test_replica_remove(mut store: Store) -> Result<()> {
1866        let mut rng = rand::thread_rng();
1867        let namespace = NamespaceSecret::new(&mut rng);
1868        let author = Author::new(&mut rng);
1869        let mut replica = store.new_replica(namespace.clone())?;
1870
1871        // insert entry
1872        let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
1873        let res = store
1874            .get_many(namespace.id(), Query::all())?
1875            .collect::<Vec<_>>();
1876        assert_eq!(res.len(), 1);
1877
1878        // remove replica
1879        let res = store.remove_replica(&namespace.id());
1880        // may not remove replica while still open;
1881        assert!(res.is_err());
1882        store.close_replica(namespace.id());
1883        store.remove_replica(&namespace.id())?;
1884        let res = store
1885            .get_many(namespace.id(), Query::all())?
1886            .collect::<Vec<_>>();
1887        assert_eq!(res.len(), 0);
1888
1889        // may not reopen removed replica
1890        let res = store.load_replica_info(&namespace.id());
1891        assert!(matches!(res, Err(OpenError::NotFound)));
1892
1893        // may recreate replica
1894        let mut replica = store.new_replica(namespace.clone())?;
1895        replica.insert(b"foo", &author, hash, 3)?;
1896        let res = store
1897            .get_many(namespace.id(), Query::all())?
1898            .collect::<Vec<_>>();
1899        assert_eq!(res.len(), 1);
1900        Ok(())
1901    }
1902
1903    #[test]
1904    fn test_replica_delete_edge_cases_memory() -> Result<()> {
1905        let store = store::Store::memory();
1906        test_replica_delete_edge_cases(store)
1907    }
1908
1909    #[test]
1910    fn test_replica_delete_edge_cases_fs() -> Result<()> {
1911        let dbfile = tempfile::NamedTempFile::new()?;
1912        let store = store::fs::Store::persistent(dbfile.path())?;
1913        test_replica_delete_edge_cases(store)
1914    }
1915
1916    fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1917        let mut rng = rand::thread_rng();
1918        let author = Author::new(&mut rng);
1919        let namespace = NamespaceSecret::new(&mut rng);
1920
1921        let edgecases = [0u8, 1u8, 255u8];
1922        let prefixes = [0u8, 255u8];
1923        let hash = Hash::new(b"foo");
1924        let len = 3;
1925        for prefix in prefixes {
1926            let mut expected = vec![];
1927            let mut replica = store.new_replica(namespace.clone())?;
1928            for suffix in edgecases {
1929                let key = [prefix, suffix].to_vec();
1930                expected.push(key.clone());
1931                replica.insert(&key, &author, hash, len)?;
1932            }
1933            assert_keys(&mut store, namespace.id(), expected);
1934            let mut replica = store.new_replica(namespace.clone())?;
1935            replica.delete_prefix([prefix], &author)?;
1936            assert_keys(&mut store, namespace.id(), vec![]);
1937        }
1938
1939        let mut replica = store.new_replica(namespace.clone())?;
1940        let key = vec![1u8, 0u8];
1941        replica.insert(key, &author, hash, len)?;
1942        let key = vec![1u8, 1u8];
1943        replica.insert(key, &author, hash, len)?;
1944        let key = vec![1u8, 2u8];
1945        replica.insert(key, &author, hash, len)?;
1946        let prefix = vec![1u8, 1u8];
1947        replica.delete_prefix(prefix, &author)?;
1948        assert_keys(
1949            &mut store,
1950            namespace.id(),
1951            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1952        );
1953
1954        let mut replica = store.new_replica(namespace.clone())?;
1955        let key = vec![0u8, 255u8];
1956        replica.insert(key, &author, hash, len)?;
1957        let key = vec![0u8, 0u8];
1958        replica.insert(key, &author, hash, len)?;
1959        let prefix = vec![0u8];
1960        replica.delete_prefix(prefix, &author)?;
1961        assert_keys(
1962            &mut store,
1963            namespace.id(),
1964            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1965        );
1966        Ok(())
1967    }
1968
1969    #[test]
1970    fn test_latest_iter_memory() -> Result<()> {
1971        let store = store::Store::memory();
1972        test_latest_iter(store)
1973    }
1974
1975    #[test]
1976    fn test_latest_iter_fs() -> Result<()> {
1977        let dbfile = tempfile::NamedTempFile::new()?;
1978        let store = store::fs::Store::persistent(dbfile.path())?;
1979        test_latest_iter(store)
1980    }
1981
1982    fn test_latest_iter(mut store: Store) -> Result<()> {
1983        let mut rng = rand::thread_rng();
1984        let author0 = Author::new(&mut rng);
1985        let author1 = Author::new(&mut rng);
1986        let namespace = NamespaceSecret::new(&mut rng);
1987        let mut replica = store.new_replica(namespace.clone())?;
1988
1989        replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
1990        let latest = store
1991            .get_latest_for_each_author(namespace.id())?
1992            .collect::<Result<Vec<_>>>()?;
1993        assert_eq!(latest.len(), 1);
1994        assert_eq!(latest[0].2, b"a0.1".to_vec());
1995
1996        let mut replica = store.new_replica(namespace.clone())?;
1997        replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
1998        replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
1999        let latest = store
2000            .get_latest_for_each_author(namespace.id())?
2001            .collect::<Result<Vec<_>>>()?;
2002        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2003        latest_keys.sort();
2004        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2005
2006        Ok(())
2007    }
2008
2009    #[test]
2010    fn test_replica_byte_keys_memory() -> Result<()> {
2011        let store = store::Store::memory();
2012
2013        test_replica_byte_keys(store)?;
2014        Ok(())
2015    }
2016
2017    #[test]
2018    fn test_replica_byte_keys_fs() -> Result<()> {
2019        let dbfile = tempfile::NamedTempFile::new()?;
2020        let store = store::fs::Store::persistent(dbfile.path())?;
2021        test_replica_byte_keys(store)?;
2022
2023        Ok(())
2024    }
2025
2026    fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2027        let mut rng = rand::thread_rng();
2028        let author = Author::new(&mut rng);
2029        let namespace = NamespaceSecret::new(&mut rng);
2030
2031        let hash = Hash::new(b"foo");
2032        let len = 3;
2033
2034        let key = vec![1u8, 0u8];
2035        let mut replica = store.new_replica(namespace.clone())?;
2036        replica.insert(key, &author, hash, len)?;
2037        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2038        let key = vec![1u8, 2u8];
2039        let mut replica = store.new_replica(namespace.clone())?;
2040        replica.insert(key, &author, hash, len)?;
2041        assert_keys(
2042            &mut store,
2043            namespace.id(),
2044            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2045        );
2046
2047        let key = vec![0u8, 255u8];
2048        let mut replica = store.new_replica(namespace.clone())?;
2049        replica.insert(key, &author, hash, len)?;
2050        assert_keys(
2051            &mut store,
2052            namespace.id(),
2053            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2054        );
2055        Ok(())
2056    }
2057
2058    #[test]
2059    fn test_replica_capability_memory() -> Result<()> {
2060        let store = store::Store::memory();
2061        test_replica_capability(store)
2062    }
2063
2064    #[test]
2065    fn test_replica_capability_fs() -> Result<()> {
2066        let dbfile = tempfile::NamedTempFile::new()?;
2067        let store = store::fs::Store::persistent(dbfile.path())?;
2068        test_replica_capability(store)
2069    }
2070
2071    #[allow(clippy::redundant_pattern_matching)]
2072    fn test_replica_capability(mut store: Store) -> Result<()> {
2073        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2074        let author = store.new_author(&mut rng)?;
2075        let namespace = NamespaceSecret::new(&mut rng);
2076
2077        // import read capability - insert must fail
2078        let capability = Capability::Read(namespace.id());
2079        store.import_namespace(capability)?;
2080        let mut replica = store.open_replica(&namespace.id())?;
2081        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2082        assert!(matches!(res, Err(InsertError::ReadOnly)));
2083
2084        // import write capability - insert must succeed
2085        let capability = Capability::Write(namespace.clone());
2086        store.import_namespace(capability)?;
2087        let mut replica = store.open_replica(&namespace.id())?;
2088        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2089        assert!(matches!(res, Ok(_)));
2090        store.close_replica(namespace.id());
2091        let mut replica = store.open_replica(&namespace.id())?;
2092        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2093        assert!(res.is_ok());
2094
2095        // import read capability again - insert must still succeed
2096        let capability = Capability::Read(namespace.id());
2097        store.import_namespace(capability)?;
2098        store.close_replica(namespace.id());
2099        let mut replica = store.open_replica(&namespace.id())?;
2100        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2101        assert!(res.is_ok());
2102        Ok(())
2103    }
2104
2105    #[tokio::test]
2106    async fn test_actor_capability_memory() -> Result<()> {
2107        let store = store::Store::memory();
2108        test_actor_capability(store).await
2109    }
2110
2111    #[tokio::test]
2112    async fn test_actor_capability_fs() -> Result<()> {
2113        let dbfile = tempfile::NamedTempFile::new()?;
2114        let store = store::fs::Store::persistent(dbfile.path())?;
2115        test_actor_capability(store).await
2116    }
2117
2118    async fn test_actor_capability(store: Store) -> Result<()> {
2119        // test with actor
2120        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2121        let author = Author::new(&mut rng);
2122        let handle = SyncHandle::spawn(store, None, "test".into());
2123        let author = handle.import_author(author).await?;
2124        let namespace = NamespaceSecret::new(&mut rng);
2125        let id = namespace.id();
2126
2127        // import read capability - insert must fail
2128        let capability = Capability::Read(namespace.id());
2129        handle.import_namespace(capability).await?;
2130        handle.open(namespace.id(), Default::default()).await?;
2131        let res = handle
2132            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2133            .await;
2134        assert!(res.is_err());
2135
2136        // import write capability - insert must succeed
2137        let capability = Capability::Write(namespace.clone());
2138        handle.import_namespace(capability).await?;
2139        let res = handle
2140            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2141            .await;
2142        assert!(res.is_ok());
2143
2144        // close and reopen - must still succeed
2145        handle.close(namespace.id()).await?;
2146        let res = handle
2147            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2148            .await;
2149        assert!(res.is_err());
2150        handle.open(namespace.id(), Default::default()).await?;
2151        let res = handle
2152            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2153            .await;
2154        assert!(res.is_ok());
2155        Ok(())
2156    }
2157
2158    /// This tests that no events are emitted for entries received during sync which are obsolete
2159    /// (too old) by the time they are actually inserted in the store.
2160    #[test]
2161    fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2162        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2163        let mut store1 = store::Store::memory();
2164        let mut store2 = store::Store::memory();
2165        let peer1 = [1u8; 32];
2166        let peer2 = [2u8; 32];
2167        let mut state1 = SyncOutcome::default();
2168        let mut state2 = SyncOutcome::default();
2169
2170        let author = Author::new(&mut rng);
2171        let namespace = NamespaceSecret::new(&mut rng);
2172        let mut replica1 = store1.new_replica(namespace.clone())?;
2173        let mut replica2 = store2.new_replica(namespace.clone())?;
2174
2175        let (events1_sender, events1) = flume::bounded(32);
2176        let (events2_sender, events2) = flume::bounded(32);
2177
2178        replica1.info.subscribe(events1_sender);
2179        replica2.info.subscribe(events2_sender);
2180
2181        replica1.hash_and_insert(b"foo", &author, b"init")?;
2182
2183        let from1 = replica1.sync_initial_message()?;
2184        let from2 = replica2
2185            .sync_process_message(from1, peer1, &mut state2)
2186            .unwrap()
2187            .unwrap();
2188        let from1 = replica1
2189            .sync_process_message(from2, peer2, &mut state1)
2190            .unwrap()
2191            .unwrap();
2192        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2193        // sync is already running. this means the entry from replica1 will be rejected. we make
2194        // sure that no InsertRemote event is emitted for this entry.
2195        replica2.hash_and_insert(b"foo", &author, b"update")?;
2196        let from2 = replica2
2197            .sync_process_message(from1, peer1, &mut state2)
2198            .unwrap();
2199        assert!(from2.is_none());
2200        let events1 = events1.drain().collect::<Vec<_>>();
2201        let events2 = events2.drain().collect::<Vec<_>>();
2202        assert_eq!(events1.len(), 1);
2203        assert_eq!(events2.len(), 1);
2204        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2205        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2206        assert_eq!(state1.num_sent, 1);
2207        assert_eq!(state1.num_recv, 0);
2208        assert_eq!(state2.num_sent, 0);
2209        assert_eq!(state2.num_recv, 1);
2210
2211        Ok(())
2212    }
2213
2214    #[test]
2215    fn test_replica_queries_mem() -> Result<()> {
2216        let store = store::Store::memory();
2217
2218        test_replica_queries(store)?;
2219        Ok(())
2220    }
2221
2222    #[test]
2223    fn test_replica_queries_fs() -> Result<()> {
2224        let dbfile = tempfile::NamedTempFile::new()?;
2225        let store = store::fs::Store::persistent(dbfile.path())?;
2226        test_replica_queries(store)?;
2227
2228        Ok(())
2229    }
2230
2231    fn test_replica_queries(mut store: Store) -> Result<()> {
2232        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2233        let namespace = NamespaceSecret::new(&mut rng);
2234        let namespace_id = namespace.id();
2235
2236        let a1 = store.new_author(&mut rng)?;
2237        let a2 = store.new_author(&mut rng)?;
2238        let a3 = store.new_author(&mut rng)?;
2239        println!(
2240            "a1 {} a2 {} a3 {}",
2241            a1.id().fmt_short(),
2242            a2.id().fmt_short(),
2243            a3.id().fmt_short()
2244        );
2245
2246        let mut replica = store.new_replica(namespace.clone())?;
2247        replica.hash_and_insert("hi/world", &a2, "a2")?;
2248        replica.hash_and_insert("hi/world", &a1, "a1")?;
2249        replica.hash_and_insert("hi/moon", &a2, "a1")?;
2250        replica.hash_and_insert("hi", &a3, "a3")?;
2251
2252        struct QueryTester<'a> {
2253            store: &'a mut Store,
2254            namespace: NamespaceId,
2255        }
2256        impl<'a> QueryTester<'a> {
2257            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2258                let query = query.into();
2259                let actual = self
2260                    .store
2261                    .get_many(self.namespace, query.clone())
2262                    .unwrap()
2263                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2264                    .collect::<Result<Vec<_>>>()
2265                    .unwrap();
2266                let expected = expected
2267                    .into_iter()
2268                    .map(|(key, author)| (key.to_string(), author.id()))
2269                    .collect::<Vec<_>>();
2270                assert_eq!(actual, expected, "query: {query:#?}")
2271            }
2272        }
2273
2274        let mut qt = QueryTester {
2275            store: &mut store,
2276            namespace: namespace_id,
2277        };
2278
2279        qt.assert(
2280            Query::all(),
2281            vec![
2282                ("hi/world", &a1),
2283                ("hi/moon", &a2),
2284                ("hi/world", &a2),
2285                ("hi", &a3),
2286            ],
2287        );
2288
2289        qt.assert(
2290            Query::single_latest_per_key(),
2291            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2292        );
2293
2294        qt.assert(
2295            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2296            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2297        );
2298
2299        qt.assert(
2300            Query::single_latest_per_key().key_prefix("hi/"),
2301            vec![("hi/moon", &a2), ("hi/world", &a1)],
2302        );
2303
2304        qt.assert(
2305            Query::single_latest_per_key()
2306                .key_prefix("hi/")
2307                .sort_direction(SortDirection::Desc),
2308            vec![("hi/world", &a1), ("hi/moon", &a2)],
2309        );
2310
2311        qt.assert(
2312            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2313            vec![
2314                ("hi", &a3),
2315                ("hi/moon", &a2),
2316                ("hi/world", &a1),
2317                ("hi/world", &a2),
2318            ],
2319        );
2320
2321        qt.assert(
2322            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2323            vec![
2324                ("hi/world", &a2),
2325                ("hi/world", &a1),
2326                ("hi/moon", &a2),
2327                ("hi", &a3),
2328            ],
2329        );
2330
2331        qt.assert(
2332            Query::all().key_prefix("hi/"),
2333            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2334        );
2335
2336        qt.assert(
2337            Query::all().key_prefix("hi/").offset(1).limit(1),
2338            vec![("hi/moon", &a2)],
2339        );
2340
2341        qt.assert(
2342            Query::all()
2343                .key_prefix("hi/")
2344                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2345            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2346        );
2347
2348        qt.assert(
2349            Query::all()
2350                .key_prefix("hi/")
2351                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2352                .offset(1)
2353                .limit(1),
2354            vec![("hi/world", &a1)],
2355        );
2356
2357        qt.assert(
2358            Query::all()
2359                .key_prefix("hi/")
2360                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2361            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2362        );
2363
2364        qt.assert(
2365            Query::all()
2366                .key_prefix("hi/")
2367                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2368            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2369        );
2370
2371        qt.assert(
2372            Query::all()
2373                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2374                .limit(2)
2375                .offset(1),
2376            vec![("hi/moon", &a2), ("hi/world", &a1)],
2377        );
2378
2379        let mut replica = store.new_replica(namespace)?;
2380        replica.delete_prefix("hi/world", &a2)?;
2381        let mut qt = QueryTester {
2382            store: &mut store,
2383            namespace: namespace_id,
2384        };
2385
2386        qt.assert(
2387            Query::all(),
2388            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2389        );
2390
2391        qt.assert(
2392            Query::all().include_empty(),
2393            vec![
2394                ("hi/world", &a1),
2395                ("hi/moon", &a2),
2396                ("hi/world", &a2),
2397                ("hi", &a3),
2398            ],
2399        );
2400
2401        Ok(())
2402    }
2403
2404    #[test]
2405    fn test_dl_policies_mem() -> Result<()> {
2406        let mut store = store::Store::memory();
2407        test_dl_policies(&mut store)
2408    }
2409
2410    #[test]
2411    fn test_dl_policies_fs() -> Result<()> {
2412        let dbfile = tempfile::NamedTempFile::new()?;
2413        let mut store = store::fs::Store::persistent(dbfile.path())?;
2414        test_dl_policies(&mut store)
2415    }
2416
2417    fn test_dl_policies(store: &mut Store) -> Result<()> {
2418        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2419        let namespace = NamespaceSecret::new(&mut rng);
2420        let id = namespace.id();
2421
2422        let filter = store::FilterKind::Exact("foo".into());
2423        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2424        store
2425            .set_download_policy(&id, policy.clone())
2426            .expect_err("document dos not exist");
2427
2428        // now create the document
2429        store.new_replica(namespace)?;
2430
2431        store.set_download_policy(&id, policy.clone())?;
2432        let retrieved_policy = store.get_download_policy(&id)?;
2433        assert_eq!(retrieved_policy, policy);
2434        Ok(())
2435    }
2436
2437    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2438        expected.sort();
2439        assert_eq!(expected, get_keys_sorted(store, namespace));
2440    }
2441
2442    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2443        let mut res = store
2444            .get_many(namespace, Query::all())
2445            .unwrap()
2446            .map(|e| e.map(|e| e.key().to_vec()))
2447            .collect::<Result<Vec<_>>>()
2448            .unwrap();
2449        res.sort();
2450        res
2451    }
2452
2453    fn get_entry(
2454        store: &mut Store,
2455        namespace: NamespaceId,
2456        author: AuthorId,
2457        key: &[u8],
2458    ) -> anyhow::Result<SignedEntry> {
2459        let entry = store
2460            .get_exact(namespace, author, key, true)?
2461            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2462        Ok(entry)
2463    }
2464
2465    fn get_content_hash(
2466        store: &mut Store,
2467        namespace: NamespaceId,
2468        author: AuthorId,
2469        key: &[u8],
2470    ) -> anyhow::Result<Option<Hash>> {
2471        let hash = store
2472            .get_exact(namespace, author, key, false)?
2473            .map(|e| e.content_hash());
2474        Ok(hash)
2475    }
2476
2477    fn sync(alice: &mut Replica, bob: &mut Replica) -> Result<(SyncOutcome, SyncOutcome)> {
2478        let alice_peer_id = [1u8; 32];
2479        let bob_peer_id = [2u8; 32];
2480        let mut alice_state = SyncOutcome::default();
2481        let mut bob_state = SyncOutcome::default();
2482        // Sync alice - bob
2483        let mut next_to_bob = Some(alice.sync_initial_message()?);
2484        let mut rounds = 0;
2485        while let Some(msg) = next_to_bob.take() {
2486            assert!(rounds < 100, "too many rounds");
2487            rounds += 1;
2488            println!("round {}", rounds);
2489            if let Some(msg) = bob.sync_process_message(msg, alice_peer_id, &mut bob_state)? {
2490                next_to_bob = alice.sync_process_message(msg, bob_peer_id, &mut alice_state)?
2491            }
2492        }
2493        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2494        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2495        Ok((alice_state, bob_state))
2496    }
2497
2498    fn check_entries(
2499        store: &mut Store,
2500        namespace: &NamespaceId,
2501        author: &Author,
2502        set: &[&str],
2503    ) -> Result<()> {
2504        for el in set {
2505            store.get_exact(*namespace, author.id(), el, false)?;
2506        }
2507        Ok(())
2508    }
2509}