Skip to main content

iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use ed25519_dalek::SignatureError;
18// `iroh::Signature` is a newtype wrapper around `ed25519_dalek::Signature` with a
19// hand-written, wire-stable `serialize_tuple` serde impl (the same impl every other
20// iroh crate uses for handshake / discovery payloads). Embedding it inside
21// `EntrySignature` — rather than the raw dalek type — keeps the on-wire
22// `SignedEntry` format independent of upstream `ed25519` serde changes.
23use iroh::Signature;
24use iroh_blobs::Hash;
25use n0_future::{
26    time::{Duration, SystemTime},
27    IterExt,
28};
29use serde::{Deserialize, Serialize};
30
31pub use crate::heads::AuthorHeads;
32use crate::{
33    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
34    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
35    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
36};
37
38/// Protocol message for the set reconciliation protocol.
39///
40/// Can be serialized to bytes with [serde] to transfer between peers.
41pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
42
43/// Byte representation of an iroh `EndpointId`.
44// TODO: Consider `iroh::EndpointId` instead of raw bytes (`iroh` re-exports it from `iroh-base`).
45pub type PeerIdBytes = [u8; 32];
46
47/// Max time in the future from our wall clock time that we accept entries for.
48/// Value is 10 minutes.
49pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_micros() as u64;
50
51/// Callback that may be set on a replica to determine the availability status for a content hash.
52pub type ContentStatusCallback =
53    Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
54
55/// Event emitted by sync when entries are added.
56#[derive(derive_more::Debug, Clone)]
57pub enum Event {
58    /// A local entry has been added.
59    LocalInsert {
60        /// Document in which the entry was inserted.
61        namespace: NamespaceId,
62        /// Inserted entry.
63        entry: SignedEntry,
64    },
65    /// A remote entry has been added.
66    RemoteInsert {
67        /// Document in which the entry was inserted.
68        namespace: NamespaceId,
69        /// Inserted entry.
70        entry: SignedEntry,
71        /// Peer that provided the inserted entry.
72        /// Debug matches [`iroh::PublicKey::fmt_short`] (first 5 bytes, lower hex).
73        #[debug("{}", hex::encode(&from[..5]))]
74        from: PeerIdBytes,
75        /// Whether download policies require the content to be downloaded.
76        should_download: bool,
77        /// [`ContentStatus`] for this entry in the remote's replica.
78        remote_content_status: ContentStatus,
79    },
80}
81
82/// Whether an entry was inserted locally or by a remote peer.
83#[derive(derive_more::Debug, Clone)]
84pub enum InsertOrigin {
85    /// The entry was inserted locally.
86    Local,
87    /// The entry was received from the remote node identified by [`PeerIdBytes`].
88    Sync {
89        /// The peer from which we received this entry.
90        /// Debug matches [`iroh::PublicKey::fmt_short`] (first 5 bytes, lower hex).
91        #[debug("{}", hex::encode(&from[..5]))]
92        from: PeerIdBytes,
93        /// Whether the peer claims to have the content blob for this entry.
94        remote_content_status: ContentStatus,
95    },
96}
97
98/// Whether the content status is available on a node.
99#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
100pub enum ContentStatus {
101    /// The content is completely available.
102    Complete,
103    /// The content is partially available.
104    Incomplete,
105    /// The content is missing.
106    Missing,
107}
108
109/// Outcome of a sync operation.
110#[derive(Debug, Clone, Default)]
111pub struct SyncOutcome {
112    /// Timestamp of the latest entry for each author in the set we received.
113    pub heads_received: AuthorHeads,
114    /// Number of entries we received.
115    pub num_recv: usize,
116    /// Number of entries we sent.
117    pub num_sent: usize,
118}
119
120fn get_as_ptr<T>(value: &T) -> Option<usize> {
121    use std::mem;
122    if mem::size_of::<T>() == std::mem::size_of::<usize>()
123        && mem::align_of::<T>() == mem::align_of::<usize>()
124    {
125        // Safe only if size and alignment requirements are met
126        unsafe { Some(mem::transmute_copy(value)) }
127    } else {
128        None
129    }
130}
131
132fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
133    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
134}
135
136#[derive(Debug, Default)]
137struct Subscribers(Vec<async_channel::Sender<Event>>);
138impl Subscribers {
139    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
140        self.0.push(sender)
141    }
142    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
143        self.0.retain(|s| !same_channel(s, sender));
144    }
145    pub async fn send(&mut self, event: Event) {
146        self.0 = std::mem::take(&mut self.0)
147            .into_iter()
148            .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
149            .join_all()
150            .await
151            .into_iter()
152            .flatten()
153            .collect();
154    }
155    pub fn len(&self) -> usize {
156        self.0.len()
157    }
158    pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
159        if !self.0.is_empty() {
160            self.send(f()).await
161        }
162    }
163}
164
165/// Kind of capability of the namespace.
166#[derive(
167    Debug,
168    Clone,
169    Copy,
170    Serialize,
171    Deserialize,
172    num_enum::IntoPrimitive,
173    num_enum::TryFromPrimitive,
174    strum::Display,
175)]
176#[repr(u8)]
177#[strum(serialize_all = "snake_case")]
178pub enum CapabilityKind {
179    /// A writable replica.
180    Write = 1,
181    /// A readable replica.
182    Read = 2,
183}
184
185/// The capability of the namespace.
186#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
187pub enum Capability {
188    /// Write access to the namespace.
189    Write(NamespaceSecret),
190    /// Read only access to the namespace.
191    Read(NamespaceId),
192}
193
194impl Capability {
195    /// Get the [`NamespaceId`] for this [`Capability`].
196    pub fn id(&self) -> NamespaceId {
197        match self {
198            Capability::Write(secret) => secret.id(),
199            Capability::Read(id) => *id,
200        }
201    }
202
203    /// Get the [`NamespaceSecret`] of this [`Capability`].
204    /// Will fail if the [`Capability`] is read only.
205    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
206        match self {
207            Capability::Write(secret) => Ok(secret),
208            Capability::Read(_) => Err(ReadOnly),
209        }
210    }
211
212    /// Get the kind of capability.
213    pub fn kind(&self) -> CapabilityKind {
214        match self {
215            Capability::Write(_) => CapabilityKind::Write,
216            Capability::Read(_) => CapabilityKind::Read,
217        }
218    }
219
220    /// Get the raw representation of this namespace capability.
221    pub fn raw(&self) -> (u8, [u8; 32]) {
222        let capability_repr: u8 = self.kind().into();
223        let bytes = match self {
224            Capability::Write(secret) => secret.to_bytes(),
225            Capability::Read(id) => id.to_bytes(),
226        };
227        (capability_repr, bytes)
228    }
229
230    /// Create a [`Capability`] from its raw representation.
231    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
232        let kind: CapabilityKind = kind.try_into()?;
233        let capability = match kind {
234            CapabilityKind::Write => {
235                let secret = NamespaceSecret::from_bytes(bytes);
236                Capability::Write(secret)
237            }
238            CapabilityKind::Read => {
239                let id = NamespaceId::from(bytes);
240                Capability::Read(id)
241            }
242        };
243        Ok(capability)
244    }
245
246    /// Merge this capability with another capability.
247    ///
248    /// Will return an error if `other` is not a capability for the same namespace.
249    ///
250    /// Returns `true` if the capability was changed, `false` otherwise.
251    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
252        if other.id() != self.id() {
253            return Err(CapabilityError::NamespaceMismatch);
254        }
255
256        // the only capability upgrade is from read-only (self) to writable (other)
257        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
258            let _ = std::mem::replace(self, other);
259            Ok(true)
260        } else {
261            Ok(false)
262        }
263    }
264}
265
266/// Errors for capability operations
267#[derive(Debug, thiserror::Error)]
268pub enum CapabilityError {
269    /// Namespaces are not the same
270    #[error("Namespaces are not the same")]
271    NamespaceMismatch,
272}
273
274/// In memory information about an open replica.
275#[derive(derive_more::Debug)]
276pub struct ReplicaInfo {
277    pub(crate) capability: Capability,
278    subscribers: Subscribers,
279    #[debug("ContentStatusCallback")]
280    content_status_cb: Option<ContentStatusCallback>,
281    closed: bool,
282}
283
284impl ReplicaInfo {
285    /// Create a new replica.
286    pub fn new(capability: Capability) -> Self {
287        Self {
288            capability,
289            subscribers: Default::default(),
290            // on_insert_sender: RwLock::new(None),
291            content_status_cb: None,
292            closed: false,
293        }
294    }
295
296    /// Subscribe to insert events.
297    ///
298    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
299    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
300    /// the receiver to be received from.
301    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
302        self.subscribers.subscribe(sender)
303    }
304
305    /// Explicitly unsubscribe a sender.
306    ///
307    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
308    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
309    /// this replica without having to drop the receiver.
310    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
311        self.subscribers.unsubscribe(sender)
312    }
313
314    /// Get the number of current event subscribers.
315    pub fn subscribers_count(&self) -> usize {
316        self.subscribers.len()
317    }
318
319    /// Set the content status callback.
320    ///
321    /// Only one callback can be active at a time. If a previous callback was registered, this
322    /// will return `false`.
323    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
324        if self.content_status_cb.is_some() {
325            false
326        } else {
327            self.content_status_cb = Some(cb);
328            true
329        }
330    }
331
332    fn ensure_open(&self) -> Result<(), InsertError> {
333        if self.closed() {
334            Err(InsertError::Closed)
335        } else {
336            Ok(())
337        }
338    }
339
340    /// Returns true if the replica is closed.
341    ///
342    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
343    /// manually, it must be closed via [`store::Store::close_replica`] or
344    /// [`store::Store::remove_replica`]
345    pub fn closed(&self) -> bool {
346        self.closed
347    }
348
349    /// Merge a capability.
350    ///
351    /// The capability must refer to the the same namespace, otherwise an error will be returned.
352    ///
353    /// This will upgrade the replica's capability when passing a `Capability::Write`.
354    /// It is a no-op if `capability` is a Capability::Read`.
355    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
356        self.capability.merge(capability)
357    }
358}
359
360/// Local representation of a mutable, synchronizable key-value store.
361#[derive(derive_more::Debug)]
362pub struct Replica<'a, I = Box<ReplicaInfo>> {
363    pub(crate) store: StoreInstance<'a>,
364    pub(crate) info: I,
365}
366
367impl<'a, I> Replica<'a, I>
368where
369    I: Deref<Target = ReplicaInfo> + DerefMut,
370{
371    /// Create a new replica.
372    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
373        Replica { info, store }
374    }
375
376    /// Insert a new record at the given key.
377    ///
378    /// The entry will by signed by the provided `author`.
379    /// The `len` must be the byte length of the data identified by `hash`.
380    ///
381    /// Returns the number of entries removed as a consequence of this insertion,
382    /// or an error either if the entry failed to validate or if a store operation failed.
383    pub async fn insert(
384        &mut self,
385        key: impl AsRef<[u8]>,
386        author: &Author,
387        hash: Hash,
388        len: u64,
389    ) -> Result<usize, InsertError> {
390        if len == 0 || hash == Hash::EMPTY {
391            return Err(InsertError::EntryIsEmpty);
392        }
393        self.info.ensure_open()?;
394        let id = RecordIdentifier::new(self.id(), author.id(), key);
395        let record = Record::new_current(hash, len);
396        let entry = Entry::new(id, record);
397        let secret = self.secret_key()?;
398        let signed_entry = entry.sign(secret, author);
399        self.insert_entry(signed_entry, InsertOrigin::Local).await
400    }
401
402    /// Delete entries that match the given `author` and key `prefix`.
403    ///
404    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
405    /// entries whose key starts with or is equal to the given `prefix`.
406    ///
407    /// Returns the number of entries deleted.
408    pub async fn delete_prefix(
409        &mut self,
410        prefix: impl AsRef<[u8]>,
411        author: &Author,
412    ) -> Result<usize, InsertError> {
413        self.info.ensure_open()?;
414        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
415        let entry = Entry::new_empty(id);
416        let signed_entry = entry.sign(self.secret_key()?, author);
417        self.insert_entry(signed_entry, InsertOrigin::Local).await
418    }
419
420    /// Insert an entry into this replica which was received from a remote peer.
421    ///
422    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
423    /// event, and insert the entry into the replica store.
424    ///
425    /// Returns the number of entries removed as a consequence of this insertion,
426    /// or an error if the entry failed to validate or if a store operation failed.
427    pub async fn insert_remote_entry(
428        &mut self,
429        entry: SignedEntry,
430        received_from: PeerIdBytes,
431        content_status: ContentStatus,
432    ) -> Result<usize, InsertError> {
433        self.info.ensure_open()?;
434        entry.validate_empty()?;
435        let origin = InsertOrigin::Sync {
436            from: received_from,
437            remote_content_status: content_status,
438        };
439        self.insert_entry(entry, origin).await
440    }
441
442    /// Insert a signed entry into the database.
443    ///
444    /// Returns the number of entries removed as a consequence of this insertion.
445    async fn insert_entry(
446        &mut self,
447        entry: SignedEntry,
448        origin: InsertOrigin,
449    ) -> Result<usize, InsertError> {
450        let namespace = self.id();
451
452        let store = &self.store;
453        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
454
455        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
456        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
457
458        let removed_count = match outcome {
459            InsertOutcome::Inserted { removed } => removed,
460            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
461        };
462
463        let insert_event = match origin {
464            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
465            InsertOrigin::Sync {
466                from,
467                remote_content_status,
468            } => {
469                let download_policy = self
470                    .store
471                    .get_download_policy(&self.id())
472                    .unwrap_or_default();
473                let should_download = download_policy.matches(entry.entry());
474                Event::RemoteInsert {
475                    namespace,
476                    entry,
477                    from,
478                    should_download,
479                    remote_content_status,
480                }
481            }
482        };
483
484        self.info.subscribers.send(insert_event).await;
485
486        Ok(removed_count)
487    }
488
489    /// Hashes the given data and inserts it.
490    ///
491    /// This does not store the content, just the record of it.
492    /// Returns the calculated hash.
493    pub async fn hash_and_insert(
494        &mut self,
495        key: impl AsRef<[u8]>,
496        author: &Author,
497        data: impl AsRef<[u8]>,
498    ) -> Result<Hash, InsertError> {
499        self.info.ensure_open()?;
500        let len = data.as_ref().len() as u64;
501        let hash = Hash::new(data);
502        self.insert(key, author, hash, len).await?;
503        Ok(hash)
504    }
505
506    /// Get the identifier for an entry in this replica.
507    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
508        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
509    }
510
511    /// Create the initial message for the set reconciliation flow with a remote peer.
512    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
513        self.info.ensure_open().map_err(anyhow::Error::from)?;
514        self.store.initial_message()
515    }
516
517    /// Process a set reconciliation message from a remote peer.
518    ///
519    /// Returns the next message to be sent to the peer, if any.
520    pub async fn sync_process_message(
521        &mut self,
522        message: crate::ranger::Message<SignedEntry>,
523        from_peer: PeerIdBytes,
524        state: &mut SyncOutcome,
525    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
526        self.info.ensure_open()?;
527        let my_namespace = self.id();
528        let now = system_time_now();
529
530        // update state with incoming data.
531        state.num_recv += message.value_count();
532        for (entry, _content_status) in message.values() {
533            state
534                .heads_received
535                .insert(entry.author(), entry.timestamp());
536        }
537
538        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
539        // l
540        let cb = self.info.content_status_cb.clone();
541        let download_policy = self
542            .store
543            .get_download_policy(&my_namespace)
544            .unwrap_or_default();
545        let reply = self
546            .store
547            .process_message(
548                &Default::default(),
549                message,
550                // validate callback: validate incoming entries, and send to on_insert channel
551                |store, entry, content_status| {
552                    let origin = InsertOrigin::Sync {
553                        from: from_peer,
554                        remote_content_status: content_status,
555                    };
556                    validate_entry(now, store, my_namespace, entry, &origin).is_ok()
557                },
558                // on_insert callback: is called when an entry was actually inserted in the store
559                async |_store, entry, content_status| {
560                    // We use `send_with` to only clone the entry if we have active subscriptions.
561                    self.info
562                        .subscribers
563                        .send_with(|| {
564                            let should_download = download_policy.matches(entry.entry());
565                            Event::RemoteInsert {
566                                from: from_peer,
567                                namespace: my_namespace,
568                                entry: entry.clone(),
569                                should_download,
570                                remote_content_status: content_status,
571                            }
572                        })
573                        .await
574                },
575                // content_status callback: get content status for outgoing entries
576                async move |entry| {
577                    if let Some(cb) = cb.as_ref() {
578                        cb(entry.content_hash()).await
579                    } else {
580                        ContentStatus::Missing
581                    }
582                },
583            )
584            .await?;
585
586        // update state with outgoing data.
587        if let Some(ref reply) = reply {
588            state.num_sent += reply.value_count();
589        }
590
591        Ok(reply)
592    }
593
594    /// Get the namespace identifier for this [`Replica`].
595    pub fn id(&self) -> NamespaceId {
596        self.info.capability.id()
597    }
598
599    /// Get the [`Capability`] of this [`Replica`].
600    pub fn capability(&self) -> &Capability {
601        &self.info.capability
602    }
603
604    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
605    /// the replica is read only
606    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
607        self.info.capability.secret_key()
608    }
609}
610
611/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
612#[derive(Debug, thiserror::Error)]
613#[error("Replica allows read access only.")]
614pub struct ReadOnly;
615
616/// Validate a [`SignedEntry`] if it's fit to be inserted.
617///
618/// This validates that
619/// * the entry's author and namespace signatures are correct
620/// * the entry's namespace matches the current replica
621/// * the entry's timestamp is not more than 10 minutes in the future of our system time
622/// * the entry is newer than an existing entry for the same key and author, if such exists.
623fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
624    now: u64,
625    store: &S,
626    expected_namespace: NamespaceId,
627    entry: &SignedEntry,
628    origin: &InsertOrigin,
629) -> Result<(), ValidationFailure> {
630    // Verify the namespace
631    if entry.namespace() != expected_namespace {
632        return Err(ValidationFailure::InvalidNamespace);
633    }
634
635    // Verify signature for non-local entries.
636    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
637        return Err(ValidationFailure::BadSignature);
638    }
639
640    // Verify that the timestamp of the entry is not too far in the future.
641    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
642        return Err(ValidationFailure::TooFarInTheFuture);
643    }
644    Ok(())
645}
646
647/// Error emitted when inserting entries into a [`Replica`] failed
648#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
649pub enum InsertError {
650    /// Storage error
651    #[error("storage error")]
652    Store(anyhow::Error),
653    /// Validation failure
654    #[error("validation failure")]
655    Validation(#[from] ValidationFailure),
656    /// A newer entry exists for either this entry's key or a prefix of the key.
657    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
658    NewerEntryExists,
659    /// Attempted to insert an empty entry.
660    #[error("Attempted to insert an empty entry")]
661    EntryIsEmpty,
662    /// Replica is read only.
663    #[error("Attempted to insert to read only replica")]
664    #[from(ReadOnly)]
665    ReadOnly,
666    /// The replica is closed, no operations may be performed.
667    #[error("replica is closed")]
668    Closed,
669}
670
671/// Reason why entry validation failed
672#[derive(thiserror::Error, Debug)]
673pub enum ValidationFailure {
674    /// Entry namespace does not match the current replica.
675    #[error("Entry namespace does not match the current replica")]
676    InvalidNamespace,
677    /// Entry signature is invalid.
678    #[error("Entry signature is invalid")]
679    BadSignature,
680    /// Entry timestamp is too far in the future.
681    #[error("Entry timestamp is too far in the future.")]
682    TooFarInTheFuture,
683    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
684    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
685    InvalidEmptyEntry,
686}
687
688/// A signed entry.
689#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
690pub struct SignedEntry {
691    signature: EntrySignature,
692    entry: Entry,
693}
694
695impl From<SignedEntry> for Entry {
696    fn from(value: SignedEntry) -> Self {
697        value.entry
698    }
699}
700
701impl PartialOrd for SignedEntry {
702    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
703        Some(self.cmp(other))
704    }
705}
706
707impl Ord for SignedEntry {
708    fn cmp(&self, other: &Self) -> Ordering {
709        self.entry.cmp(&other.entry)
710    }
711}
712
713impl PartialOrd for Entry {
714    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
715        Some(self.cmp(other))
716    }
717}
718
719impl Ord for Entry {
720    fn cmp(&self, other: &Self) -> Ordering {
721        self.id
722            .cmp(&other.id)
723            .then_with(|| self.record.cmp(&other.record))
724    }
725}
726
727impl SignedEntry {
728    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
729        SignedEntry { signature, entry }
730    }
731
732    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
733    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
734        let signature = EntrySignature::from_entry(&entry, namespace, author);
735        SignedEntry { signature, entry }
736    }
737
738    /// Create a new signed entries from its parts.
739    pub fn from_parts(
740        namespace: &NamespaceSecret,
741        author: &Author,
742        key: impl AsRef<[u8]>,
743        record: Record,
744    ) -> Self {
745        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
746        let entry = Entry::new(id, record);
747        Self::from_entry(entry, namespace, author)
748    }
749
750    /// Verify the signatures on this entry.
751    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
752        self.signature.verify(
753            &self.entry,
754            &self.entry.namespace().public_key(store)?,
755            &self.entry.author().public_key(store)?,
756        )
757    }
758
759    /// Get the signature.
760    pub fn signature(&self) -> &EntrySignature {
761        &self.signature
762    }
763
764    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
765    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
766        self.entry().validate_empty()
767    }
768
769    /// Get the [`Entry`].
770    pub fn entry(&self) -> &Entry {
771        &self.entry
772    }
773
774    /// Get the content [`struct@Hash`] of the entry.
775    pub fn content_hash(&self) -> Hash {
776        self.entry().content_hash()
777    }
778
779    /// Get the content length of the entry.
780    pub fn content_len(&self) -> u64 {
781        self.entry().content_len()
782    }
783
784    /// Get the author bytes of this entry.
785    pub fn author_bytes(&self) -> AuthorId {
786        self.entry().id().author()
787    }
788
789    /// Get the key of the entry.
790    pub fn key(&self) -> &[u8] {
791        self.entry().id().key()
792    }
793
794    /// Get the timestamp of the entry.
795    pub fn timestamp(&self) -> u64 {
796        self.entry().timestamp()
797    }
798}
799
800impl RangeEntry for SignedEntry {
801    type Key = RecordIdentifier;
802    type Value = Record;
803
804    fn key(&self) -> &Self::Key {
805        &self.entry.id
806    }
807
808    fn value(&self) -> &Self::Value {
809        &self.entry.record
810    }
811
812    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
813        let mut hasher = blake3::Hasher::new();
814        hasher.update(self.namespace().as_ref());
815        hasher.update(self.author_bytes().as_ref());
816        hasher.update(self.key());
817        hasher.update(&self.timestamp().to_be_bytes());
818        hasher.update(self.content_hash().as_bytes());
819        Fingerprint(hasher.finalize().into())
820    }
821}
822
823/// Signature over an entry.
824#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
825pub struct EntrySignature {
826    author_signature: Signature,
827    namespace_signature: Signature,
828}
829
830impl Debug for EntrySignature {
831    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
832        f.debug_struct("EntrySignature")
833            .field(
834                "namespace_signature",
835                &hex::encode(self.namespace_signature.to_bytes()),
836            )
837            .field(
838                "author_signature",
839                &hex::encode(self.author_signature.to_bytes()),
840            )
841            .finish()
842    }
843}
844
845impl EntrySignature {
846    /// Create a new signature by signing an entry with the `namespace` and `author`.
847    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
848        // TODO: this should probably include a namespace prefix
849        // namespace in the cryptographic sense.
850        let bytes = entry.to_vec();
851        let namespace_signature = Signature::from_bytes(&namespace.sign(&bytes).to_bytes());
852        let author_signature = Signature::from_bytes(&author.sign(&bytes).to_bytes());
853
854        EntrySignature {
855            author_signature,
856            namespace_signature,
857        }
858    }
859
860    /// Verify that this signature was created by signing the `entry` with the
861    /// secret keys of the specified `author` and `namespace`.
862    pub fn verify(
863        &self,
864        entry: &Entry,
865        namespace: &NamespacePublicKey,
866        author: &AuthorPublicKey,
867    ) -> Result<(), SignatureError> {
868        let bytes = entry.to_vec();
869        let namespace_signature =
870            ed25519_dalek::Signature::from_bytes(&self.namespace_signature.to_bytes());
871        let author_signature =
872            ed25519_dalek::Signature::from_bytes(&self.author_signature.to_bytes());
873        namespace.verify(&bytes, &namespace_signature)?;
874        author.verify(&bytes, &author_signature)?;
875
876        Ok(())
877    }
878
879    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
880        let namespace_signature = Signature::from_bytes(namespace_sig);
881        let author_signature = Signature::from_bytes(author_sig);
882
883        EntrySignature {
884            author_signature,
885            namespace_signature,
886        }
887    }
888
889    pub(crate) fn author(&self) -> &Signature {
890        &self.author_signature
891    }
892
893    pub(crate) fn namespace(&self) -> &Signature {
894        &self.namespace_signature
895    }
896}
897
898/// A single entry in a [`Replica`]
899///
900/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
901/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
902/// of the entry's content data, the size of this content data, and a timestamp.
903#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
904pub struct Entry {
905    id: RecordIdentifier,
906    record: Record,
907}
908
909impl Entry {
910    /// Create a new entry
911    pub fn new(id: RecordIdentifier, record: Record) -> Self {
912        Entry { id, record }
913    }
914
915    /// Create a new empty entry with the current timestamp.
916    pub fn new_empty(id: RecordIdentifier) -> Self {
917        Entry {
918            id,
919            record: Record::empty_current(),
920        }
921    }
922
923    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
924    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
925        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
926            (true, true) => Ok(()),
927            (false, false) => Ok(()),
928            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
929            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
930        }
931    }
932
933    /// Get the [`RecordIdentifier`] for this entry.
934    pub fn id(&self) -> &RecordIdentifier {
935        &self.id
936    }
937
938    /// Get the [`NamespaceId`] of this entry.
939    pub fn namespace(&self) -> NamespaceId {
940        self.id.namespace()
941    }
942
943    /// Get the [`AuthorId`] of this entry.
944    pub fn author(&self) -> AuthorId {
945        self.id.author()
946    }
947
948    /// Get the key of this entry.
949    pub fn key(&self) -> &[u8] {
950        self.id.key()
951    }
952
953    /// Get the [`Record`] contained in this entry.
954    pub fn record(&self) -> &Record {
955        &self.record
956    }
957
958    /// Get the content hash of the record.
959    pub fn content_hash(&self) -> Hash {
960        self.record.hash
961    }
962
963    /// Get the content length of the record.
964    pub fn content_len(&self) -> u64 {
965        self.record.len
966    }
967
968    /// Get the timestamp of the record.
969    pub fn timestamp(&self) -> u64 {
970        self.record.timestamp
971    }
972
973    /// Serialize this entry into its canonical byte representation used for signing.
974    pub fn encode(&self, out: &mut Vec<u8>) {
975        self.id.encode(out);
976        self.record.encode(out);
977    }
978
979    /// Serialize this entry into a new vector with its canonical byte representation.
980    pub fn to_vec(&self) -> Vec<u8> {
981        let mut out = Vec::new();
982        self.encode(&mut out);
983        out
984    }
985
986    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
987    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
988        SignedEntry::from_entry(self, namespace, author)
989    }
990}
991
992const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
993const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
994const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
995
996/// The identifier of a record.
997#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
998pub struct RecordIdentifier(Bytes);
999
1000impl Default for RecordIdentifier {
1001    fn default() -> Self {
1002        Self::new(NamespaceId::default(), AuthorId::default(), b"")
1003    }
1004}
1005
1006impl Debug for RecordIdentifier {
1007    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008        f.debug_struct("RecordIdentifier")
1009            .field("namespace", &self.namespace())
1010            .field("author", &self.author())
1011            .field("key", &std::string::String::from_utf8_lossy(self.key()))
1012            .finish()
1013    }
1014}
1015
1016impl RangeKey for RecordIdentifier {
1017    #[cfg(test)]
1018    fn is_prefix_of(&self, other: &Self) -> bool {
1019        other.as_ref().starts_with(self.as_ref())
1020    }
1021}
1022
1023fn system_time_now() -> u64 {
1024    SystemTime::now()
1025        .duration_since(SystemTime::UNIX_EPOCH)
1026        .expect("time drift")
1027        .as_micros() as u64
1028}
1029
1030impl RecordIdentifier {
1031    /// Create a new [`RecordIdentifier`].
1032    pub fn new(
1033        namespace: impl Into<NamespaceId>,
1034        author: impl Into<AuthorId>,
1035        key: impl AsRef<[u8]>,
1036    ) -> Self {
1037        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1038        bytes.extend_from_slice(namespace.into().as_bytes());
1039        bytes.extend_from_slice(author.into().as_bytes());
1040        bytes.extend_from_slice(key.as_ref());
1041        Self(bytes.freeze())
1042    }
1043
1044    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1045    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1046        out.extend_from_slice(&self.0);
1047    }
1048
1049    /// Get this [`RecordIdentifier`] as [Bytes].
1050    pub fn as_bytes(&self) -> Bytes {
1051        self.0.clone()
1052    }
1053
1054    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1055    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1056        (
1057            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1058            self.0[AUTHOR_BYTES].try_into().unwrap(),
1059            &self.0[KEY_BYTES],
1060        )
1061    }
1062
1063    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1064    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1065        (
1066            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1067            self.0[AUTHOR_BYTES].try_into().unwrap(),
1068            self.0.slice(KEY_BYTES),
1069        )
1070    }
1071
1072    /// Get the key of this record.
1073    pub fn key(&self) -> &[u8] {
1074        &self.0[KEY_BYTES]
1075    }
1076
1077    /// Get the key of this record as [`Bytes`].
1078    pub fn key_bytes(&self) -> Bytes {
1079        self.0.slice(KEY_BYTES)
1080    }
1081
1082    /// Get the [`NamespaceId`] of this record as byte array.
1083    pub fn namespace(&self) -> NamespaceId {
1084        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1085        value.into()
1086    }
1087
1088    /// Get the [`AuthorId`] of this record as byte array.
1089    pub fn author(&self) -> AuthorId {
1090        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1091        value.into()
1092    }
1093}
1094
1095impl AsRef<[u8]> for RecordIdentifier {
1096    fn as_ref(&self) -> &[u8] {
1097        &self.0
1098    }
1099}
1100
1101impl Deref for SignedEntry {
1102    type Target = Entry;
1103    fn deref(&self) -> &Self::Target {
1104        &self.entry
1105    }
1106}
1107
1108impl Deref for Entry {
1109    type Target = Record;
1110    fn deref(&self) -> &Self::Target {
1111        &self.record
1112    }
1113}
1114
1115/// The data part of an entry in a [`Replica`].
1116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1117pub struct Record {
1118    /// Length of the data referenced by `hash`.
1119    len: u64,
1120    /// Hash of the content data.
1121    hash: Hash,
1122    /// Record creation timestamp. Counted as micros since the Unix epoch.
1123    timestamp: u64,
1124}
1125
1126impl RangeValue for Record {}
1127
1128/// Ordering for entry values.
1129///
1130/// Compares first the timestamp, then the content hash.
1131impl Ord for Record {
1132    fn cmp(&self, other: &Self) -> Ordering {
1133        self.timestamp
1134            .cmp(&other.timestamp)
1135            .then_with(|| self.hash.cmp(&other.hash))
1136    }
1137}
1138
1139impl PartialOrd for Record {
1140    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1141        Some(self.cmp(other))
1142    }
1143}
1144
1145impl Record {
1146    /// Create a new record.
1147    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1148        debug_assert!(
1149            len != 0 || hash == Hash::EMPTY,
1150            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1151        );
1152        Record {
1153            hash,
1154            len,
1155            timestamp,
1156        }
1157    }
1158
1159    /// Create a tombstone record (empty content)
1160    pub fn empty(timestamp: u64) -> Self {
1161        Self::new(Hash::EMPTY, 0, timestamp)
1162    }
1163
1164    /// Create a tombstone record with the timestamp set to now.
1165    pub fn empty_current() -> Self {
1166        Self::new_current(Hash::EMPTY, 0)
1167    }
1168
1169    /// Return `true` if the entry is empty.
1170    pub fn is_empty(&self) -> bool {
1171        self.hash == Hash::EMPTY
1172    }
1173
1174    /// Create a new [`Record`] with the timestamp set to now.
1175    pub fn new_current(hash: Hash, len: u64) -> Self {
1176        let timestamp = system_time_now();
1177        Self::new(hash, len, timestamp)
1178    }
1179
1180    /// Get the length of the data addressed by this record's content hash.
1181    pub fn content_len(&self) -> u64 {
1182        self.len
1183    }
1184
1185    /// Get the [`struct@Hash`] of the content data of this record.
1186    pub fn content_hash(&self) -> Hash {
1187        self.hash
1188    }
1189
1190    /// Get the timestamp of this record.
1191    pub fn timestamp(&self) -> u64 {
1192        self.timestamp
1193    }
1194
1195    #[cfg(test)]
1196    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1197        let len = data.as_ref().len() as u64;
1198        let hash = Hash::new(data);
1199        Self::new_current(hash, len)
1200    }
1201
1202    #[cfg(test)]
1203    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1204        let len = data.as_ref().len() as u64;
1205        let hash = Hash::new(data);
1206        Self::new(hash, len, timestamp)
1207    }
1208
1209    /// Serialize this record into a mutable byte array.
1210    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1211        out.extend_from_slice(&self.len.to_be_bytes());
1212        out.extend_from_slice(self.hash.as_ref());
1213        out.extend_from_slice(&self.timestamp.to_be_bytes())
1214    }
1215}
1216
1217#[cfg(test)]
1218mod tests {
1219    use std::collections::HashSet;
1220
1221    use anyhow::Result;
1222    use rand::SeedableRng;
1223
1224    use super::*;
1225    use crate::{
1226        actor::SyncHandle,
1227        ranger::{Range, Store as _},
1228        store::{OpenError, Query, SortBy, SortDirection, Store},
1229    };
1230
1231    #[tokio::test]
1232    async fn test_basics_memory() -> Result<()> {
1233        let store = store::Store::memory();
1234        test_basics(store).await?;
1235
1236        Ok(())
1237    }
1238
1239    #[tokio::test]
1240    #[cfg(feature = "fs-store")]
1241    async fn test_basics_fs() -> Result<()> {
1242        let dbfile = tempfile::NamedTempFile::new()?;
1243        let store = store::fs::Store::persistent(dbfile.path())?;
1244        test_basics(store).await?;
1245        Ok(())
1246    }
1247
1248    async fn test_basics(mut store: Store) -> Result<()> {
1249        let mut rng = rand::rng();
1250        let alice = Author::new(&mut rng);
1251        let bob = Author::new(&mut rng);
1252        let myspace = NamespaceSecret::new(&mut rng);
1253
1254        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1255        let record = Record::current_from_data(b"this is my cool data");
1256        let entry = Entry::new(record_id, record);
1257        let signed_entry = entry.sign(&myspace, &alice);
1258        signed_entry.verify(&()).expect("failed to verify");
1259
1260        let mut my_replica = store.new_replica(myspace.clone())?;
1261        for i in 0..10 {
1262            my_replica
1263                .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1264                .await?;
1265        }
1266
1267        for i in 0..10 {
1268            let res = store
1269                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1270                .unwrap();
1271            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1272            assert_eq!(res.entry().record().content_len(), len);
1273            res.verify(&())?;
1274        }
1275
1276        // Test multiple records for the same key
1277        let mut my_replica = store.new_replica(myspace.clone())?;
1278        my_replica
1279            .hash_and_insert("/cool/path", &alice, "round 1")
1280            .await?;
1281        let _entry = store
1282            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1283            .unwrap();
1284        // Second
1285        let mut my_replica = store.new_replica(myspace.clone())?;
1286        my_replica
1287            .hash_and_insert("/cool/path", &alice, "round 2")
1288            .await?;
1289        let _entry = store
1290            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1291            .unwrap();
1292
1293        // Get All by author
1294        let entries: Vec<_> = store
1295            .get_many(myspace.id(), Query::author(alice.id()))?
1296            .collect::<Result<_>>()?;
1297        assert_eq!(entries.len(), 11);
1298
1299        // Get All by author
1300        let entries: Vec<_> = store
1301            .get_many(myspace.id(), Query::author(bob.id()))?
1302            .collect::<Result<_>>()?;
1303        assert_eq!(entries.len(), 0);
1304
1305        // Get All by key
1306        let entries: Vec<_> = store
1307            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1308            .collect::<Result<_>>()?;
1309        assert_eq!(entries.len(), 1);
1310
1311        // Get All
1312        let entries: Vec<_> = store
1313            .get_many(myspace.id(), Query::all())?
1314            .collect::<Result<_>>()?;
1315        assert_eq!(entries.len(), 11);
1316
1317        // insert record from different author
1318        let mut my_replica = store.new_replica(myspace.clone())?;
1319        let _entry = my_replica
1320            .hash_and_insert("/cool/path", &bob, "bob round 1")
1321            .await?;
1322
1323        // Get All by author
1324        let entries: Vec<_> = store
1325            .get_many(myspace.id(), Query::author(alice.id()))?
1326            .collect::<Result<_>>()?;
1327        assert_eq!(entries.len(), 11);
1328
1329        let entries: Vec<_> = store
1330            .get_many(myspace.id(), Query::author(bob.id()))?
1331            .collect::<Result<_>>()?;
1332        assert_eq!(entries.len(), 1);
1333
1334        // Get All by key
1335        let entries: Vec<_> = store
1336            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1337            .collect::<Result<_>>()?;
1338        assert_eq!(entries.len(), 2);
1339
1340        // Get all by prefix
1341        let entries: Vec<_> = store
1342            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1343            .collect::<Result<_>>()?;
1344        assert_eq!(entries.len(), 2);
1345
1346        // Get All by author and prefix
1347        let entries: Vec<_> = store
1348            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1349            .collect::<Result<_>>()?;
1350        assert_eq!(entries.len(), 1);
1351
1352        let entries: Vec<_> = store
1353            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1354            .collect::<Result<_>>()?;
1355        assert_eq!(entries.len(), 1);
1356
1357        // Get All
1358        let entries: Vec<_> = store
1359            .get_many(myspace.id(), Query::all())?
1360            .collect::<Result<_>>()?;
1361        assert_eq!(entries.len(), 12);
1362
1363        // Get Range of all should return all latest
1364        let mut my_replica = store.new_replica(myspace.clone())?;
1365        let entries_second: Vec<_> = my_replica
1366            .store
1367            .get_range(Range::new(
1368                RecordIdentifier::default(),
1369                RecordIdentifier::default(),
1370            ))?
1371            .collect::<Result<_, _>>()?;
1372
1373        assert_eq!(entries_second.len(), 12);
1374        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1375
1376        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1377        store.flush()?;
1378        Ok(())
1379    }
1380
1381    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1382    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1383    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1384        /// Helper to verify the store returns the expected peers for the namespace.
1385        #[track_caller]
1386        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1387            assert_eq!(
1388                expected_peers,
1389                &store
1390                    .get_sync_peers(&namespace)
1391                    .unwrap()
1392                    .unwrap()
1393                    .collect::<Vec<_>>(),
1394                "sync peers differ"
1395            );
1396        }
1397
1398        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1399        // expected peers: newest peers are to the front, oldest to the back
1400        let mut expected_peers = Vec::with_capacity(count);
1401        for i in 0..count as u8 {
1402            let peer = [i; 32];
1403            expected_peers.insert(0, peer);
1404            store.register_useful_peer(namespace, peer)?;
1405        }
1406        verify_peers(store, namespace, &expected_peers);
1407
1408        // one more peer should evict the last peer
1409        expected_peers.pop();
1410        let newer_peer = [count as u8; 32];
1411        expected_peers.insert(0, newer_peer);
1412        store.register_useful_peer(namespace, newer_peer)?;
1413        verify_peers(store, namespace, &expected_peers);
1414
1415        // move one existing peer up
1416        let refreshed_peer = expected_peers.remove(2);
1417        expected_peers.insert(0, refreshed_peer);
1418        store.register_useful_peer(namespace, refreshed_peer)?;
1419        verify_peers(store, namespace, &expected_peers);
1420        Ok(())
1421    }
1422
1423    #[tokio::test]
1424    async fn test_content_hashes_iterator_memory() -> Result<()> {
1425        let store = store::Store::memory();
1426        test_content_hashes_iterator(store).await
1427    }
1428
1429    #[tokio::test]
1430    #[cfg(feature = "fs-store")]
1431    async fn test_content_hashes_iterator_fs() -> Result<()> {
1432        let dbfile = tempfile::NamedTempFile::new()?;
1433        let store = store::fs::Store::persistent(dbfile.path())?;
1434        test_content_hashes_iterator(store).await
1435    }
1436
1437    async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1438        let mut rng = rand::rng();
1439        let mut expected = HashSet::new();
1440        let n_replicas = 3;
1441        let n_entries = 4;
1442        for i in 0..n_replicas {
1443            let namespace = NamespaceSecret::new(&mut rng);
1444            let author = store.new_author(&mut rng)?;
1445            let mut replica = store.new_replica(namespace)?;
1446            for j in 0..n_entries {
1447                let key = format!("{j}");
1448                let data = format!("{i}:{j}");
1449                let hash = replica.hash_and_insert(key, &author, data).await?;
1450                expected.insert(hash);
1451            }
1452        }
1453        assert_eq!(expected.len(), n_replicas * n_entries);
1454        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1455        assert_eq!(actual, expected);
1456        Ok(())
1457    }
1458
1459    #[test]
1460    fn test_multikey() {
1461        let mut rng = rand::rng();
1462
1463        let k = ["a", "c", "z"];
1464
1465        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1466        n.sort_by_key(|n| n.id());
1467
1468        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1469        a.sort_by_key(|a| a.id());
1470
1471        // Just key
1472        {
1473            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1474            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1475            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1476
1477            let range = Range::new(ri0.clone(), ri2.clone());
1478            assert!(range.contains(&ri0), "start");
1479            assert!(range.contains(&ri1), "inside");
1480            assert!(!range.contains(&ri2), "end");
1481
1482            assert!(ri0 < ri1);
1483            assert!(ri1 < ri2);
1484        }
1485
1486        // Just namespace
1487        {
1488            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1489            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1490            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1491
1492            let range = Range::new(ri0.clone(), ri2.clone());
1493            assert!(range.contains(&ri0), "start");
1494            assert!(range.contains(&ri1), "inside");
1495            assert!(!range.contains(&ri2), "end");
1496
1497            assert!(ri0 < ri1);
1498            assert!(ri1 < ri2);
1499        }
1500
1501        // Just author
1502        {
1503            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1504            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1505            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1506
1507            let range = Range::new(ri0.clone(), ri2.clone());
1508            assert!(range.contains(&ri0), "start");
1509            assert!(range.contains(&ri1), "inside");
1510            assert!(!range.contains(&ri2), "end");
1511
1512            assert!(ri0 < ri1);
1513            assert!(ri1 < ri2);
1514        }
1515
1516        // Just key and namespace
1517        {
1518            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1519            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1520            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1521
1522            let range = Range::new(ri0.clone(), ri2.clone());
1523            assert!(range.contains(&ri0), "start");
1524            assert!(range.contains(&ri1), "inside");
1525            assert!(!range.contains(&ri2), "end");
1526
1527            assert!(ri0 < ri1);
1528            assert!(ri1 < ri2);
1529        }
1530
1531        // Mixed
1532        {
1533            // Ord should prioritize namespace - author - key
1534
1535            let a0 = a[0].id();
1536            let a1 = a[1].id();
1537            let n0 = n[0].id();
1538            let n1 = n[1].id();
1539            let k0 = k[0];
1540            let k1 = k[1];
1541
1542            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1543            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1544            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1545            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1546        }
1547    }
1548
1549    #[tokio::test]
1550    async fn test_timestamps_memory() -> Result<()> {
1551        let store = store::Store::memory();
1552        test_timestamps(store).await?;
1553
1554        Ok(())
1555    }
1556
1557    #[tokio::test]
1558    #[cfg(feature = "fs-store")]
1559    async fn test_timestamps_fs() -> Result<()> {
1560        let dbfile = tempfile::NamedTempFile::new()?;
1561        let store = store::fs::Store::persistent(dbfile.path())?;
1562        test_timestamps(store).await?;
1563        Ok(())
1564    }
1565
1566    async fn test_timestamps(mut store: Store) -> Result<()> {
1567        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1568        let namespace = NamespaceSecret::new(&mut rng);
1569        let _replica = store.new_replica(namespace.clone())?;
1570        let author = store.new_author(&mut rng)?;
1571        store.close_replica(namespace.id());
1572        let mut replica = store.open_replica(&namespace.id())?;
1573
1574        let key = b"hello";
1575        let value = b"world";
1576        let entry = {
1577            let timestamp = 2;
1578            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1579            let record = Record::from_data(value, timestamp);
1580            Entry::new(id, record).sign(&namespace, &author)
1581        };
1582
1583        replica
1584            .insert_entry(entry.clone(), InsertOrigin::Local)
1585            .await
1586            .unwrap();
1587        store.close_replica(namespace.id());
1588        let res = store
1589            .get_exact(namespace.id(), author.id(), key, false)?
1590            .unwrap();
1591        assert_eq!(res, entry);
1592
1593        let entry2 = {
1594            let timestamp = 1;
1595            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1596            let record = Record::from_data(value, timestamp);
1597            Entry::new(id, record).sign(&namespace, &author)
1598        };
1599
1600        let mut replica = store.open_replica(&namespace.id())?;
1601        let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1602        store.close_replica(namespace.id());
1603        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1604        let res = store
1605            .get_exact(namespace.id(), author.id(), key, false)?
1606            .unwrap();
1607        assert_eq!(res, entry);
1608        store.flush()?;
1609        Ok(())
1610    }
1611
1612    #[tokio::test]
1613    async fn test_replica_sync_memory() -> Result<()> {
1614        let alice_store = store::Store::memory();
1615        let bob_store = store::Store::memory();
1616
1617        test_replica_sync(alice_store, bob_store).await?;
1618        Ok(())
1619    }
1620
1621    #[tokio::test]
1622    #[cfg(feature = "fs-store")]
1623    async fn test_replica_sync_fs() -> Result<()> {
1624        let alice_dbfile = tempfile::NamedTempFile::new()?;
1625        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1626        let bob_dbfile = tempfile::NamedTempFile::new()?;
1627        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1628        test_replica_sync(alice_store, bob_store).await?;
1629
1630        Ok(())
1631    }
1632
1633    async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1634        let alice_set = ["ape", "eel", "fox", "gnu"];
1635        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1636
1637        let mut rng = rand::rng();
1638        let author = Author::new(&mut rng);
1639        let myspace = NamespaceSecret::new(&mut rng);
1640        let mut alice = alice_store.new_replica(myspace.clone())?;
1641        for el in &alice_set {
1642            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1643        }
1644
1645        let mut bob = bob_store.new_replica(myspace.clone())?;
1646        for el in &bob_set {
1647            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1648        }
1649
1650        let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1651
1652        assert_eq!(alice_out.num_sent, 2);
1653        assert_eq!(bob_out.num_recv, 2);
1654        assert_eq!(alice_out.num_recv, 6);
1655        assert_eq!(bob_out.num_sent, 6);
1656
1657        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1658        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1659        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1660        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1661        alice_store.flush()?;
1662        bob_store.flush()?;
1663        Ok(())
1664    }
1665
1666    #[tokio::test]
1667    async fn test_replica_timestamp_sync_memory() -> Result<()> {
1668        let alice_store = store::Store::memory();
1669        let bob_store = store::Store::memory();
1670
1671        test_replica_timestamp_sync(alice_store, bob_store).await?;
1672        Ok(())
1673    }
1674
1675    #[tokio::test]
1676    #[cfg(feature = "fs-store")]
1677    async fn test_replica_timestamp_sync_fs() -> Result<()> {
1678        let alice_dbfile = tempfile::NamedTempFile::new()?;
1679        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1680        let bob_dbfile = tempfile::NamedTempFile::new()?;
1681        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1682        test_replica_timestamp_sync(alice_store, bob_store).await?;
1683
1684        Ok(())
1685    }
1686
1687    async fn test_replica_timestamp_sync(
1688        mut alice_store: Store,
1689        mut bob_store: Store,
1690    ) -> Result<()> {
1691        let mut rng = rand::rng();
1692        let author = Author::new(&mut rng);
1693        let namespace = NamespaceSecret::new(&mut rng);
1694        let mut alice = alice_store.new_replica(namespace.clone())?;
1695        let mut bob = bob_store.new_replica(namespace.clone())?;
1696
1697        let key = b"key";
1698        let alice_value = b"alice";
1699        let bob_value = b"bob";
1700        let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1701        // system time increased - sync should overwrite
1702        let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1703        sync(&mut alice, &mut bob).await?;
1704        assert_eq!(
1705            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1706            Some(bob_hash)
1707        );
1708        assert_eq!(
1709            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1710            Some(bob_hash)
1711        );
1712
1713        let mut alice = alice_store.new_replica(namespace.clone())?;
1714        let mut bob = bob_store.new_replica(namespace.clone())?;
1715
1716        let alice_value_2 = b"alice2";
1717        // system time increased - sync should overwrite
1718        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1719        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1720        sync(&mut alice, &mut bob).await?;
1721        assert_eq!(
1722            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1723            Some(alice_hash_2)
1724        );
1725        assert_eq!(
1726            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1727            Some(alice_hash_2)
1728        );
1729        alice_store.flush()?;
1730        bob_store.flush()?;
1731        Ok(())
1732    }
1733
1734    #[tokio::test]
1735    async fn test_future_timestamp() -> Result<()> {
1736        let mut rng = rand::rng();
1737        let mut store = store::Store::memory();
1738        let author = Author::new(&mut rng);
1739        let namespace = NamespaceSecret::new(&mut rng);
1740
1741        let mut replica = store.new_replica(namespace.clone())?;
1742        let key = b"hi";
1743        let t = system_time_now();
1744        let record = Record::from_data(b"1", t);
1745        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1746        replica
1747            .insert_entry(entry0.clone(), InsertOrigin::Local)
1748            .await?;
1749
1750        assert_eq!(
1751            get_entry(&mut store, namespace.id(), author.id(), key)?,
1752            entry0
1753        );
1754
1755        let mut replica = store.new_replica(namespace.clone())?;
1756        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1757        let record = Record::from_data(b"2", t);
1758        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1759        replica
1760            .insert_entry(entry1.clone(), InsertOrigin::Local)
1761            .await?;
1762        assert_eq!(
1763            get_entry(&mut store, namespace.id(), author.id(), key)?,
1764            entry1
1765        );
1766
1767        let mut replica = store.new_replica(namespace.clone())?;
1768        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1769        let record = Record::from_data(b"2", t);
1770        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1771        replica
1772            .insert_entry(entry2.clone(), InsertOrigin::Local)
1773            .await?;
1774        assert_eq!(
1775            get_entry(&mut store, namespace.id(), author.id(), key)?,
1776            entry2
1777        );
1778
1779        let mut replica = store.new_replica(namespace.clone())?;
1780        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1781        let record = Record::from_data(b"2", t);
1782        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1783        let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1784        assert!(matches!(
1785            res,
1786            Err(InsertError::Validation(
1787                ValidationFailure::TooFarInTheFuture
1788            ))
1789        ));
1790        assert_eq!(
1791            get_entry(&mut store, namespace.id(), author.id(), key)?,
1792            entry2
1793        );
1794        store.flush()?;
1795        Ok(())
1796    }
1797
1798    /// Regression: `MAX_TIMESTAMP_FUTURE_SHIFT` must be on the same unit scale as
1799    /// `system_time_now()` (Unix epoch **microseconds**). A millisecond-scaled constant
1800    /// only allows ~0.6s of future slack and would reject this insert.
1801    #[tokio::test]
1802    async fn test_future_timestamp_accepts_one_second_skew() -> Result<()> {
1803        let mut rng = rand::rng();
1804        let mut store = store::Store::memory();
1805        let author = Author::new(&mut rng);
1806        let namespace = NamespaceSecret::new(&mut rng);
1807        let mut replica = store.new_replica(namespace.clone())?;
1808        let key = b"skew";
1809        let skew_micros: u64 = Duration::from_secs(1).as_micros() as u64;
1810        let now = system_time_now();
1811        let record = Record::from_data(b"ahead", now + skew_micros);
1812        let entry = SignedEntry::from_parts(&namespace, &author, key, record);
1813        replica
1814            .insert_entry(entry.clone(), InsertOrigin::Local)
1815            .await?;
1816        assert_eq!(
1817            get_entry(&mut store, namespace.id(), author.id(), key)?,
1818            entry
1819        );
1820        store.flush()?;
1821        Ok(())
1822    }
1823
1824    #[tokio::test]
1825    async fn test_insert_empty() -> Result<()> {
1826        let mut store = store::Store::memory();
1827        let mut rng = rand::rng();
1828        let alice = Author::new(&mut rng);
1829        let myspace = NamespaceSecret::new(&mut rng);
1830        let mut replica = store.new_replica(myspace.clone())?;
1831        let hash = Hash::new(b"");
1832        let res = replica.insert(b"foo", &alice, hash, 0).await;
1833        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1834        store.flush()?;
1835        Ok(())
1836    }
1837
1838    #[tokio::test]
1839    async fn test_prefix_delete_memory() -> Result<()> {
1840        let store = store::Store::memory();
1841        test_prefix_delete(store).await?;
1842        Ok(())
1843    }
1844
1845    #[tokio::test]
1846    #[cfg(feature = "fs-store")]
1847    async fn test_prefix_delete_fs() -> Result<()> {
1848        let dbfile = tempfile::NamedTempFile::new()?;
1849        let store = store::fs::Store::persistent(dbfile.path())?;
1850        test_prefix_delete(store).await?;
1851        Ok(())
1852    }
1853
1854    async fn test_prefix_delete(mut store: Store) -> Result<()> {
1855        let mut rng = rand::rng();
1856        let alice = Author::new(&mut rng);
1857        let myspace = NamespaceSecret::new(&mut rng);
1858        let mut replica = store.new_replica(myspace.clone())?;
1859        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1860        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1861
1862        // sanity checks
1863        assert_eq!(
1864            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1865            Some(hash1)
1866        );
1867        assert_eq!(
1868            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1869            Some(hash2)
1870        );
1871
1872        // delete
1873        let mut replica = store.new_replica(myspace.clone())?;
1874        let deleted = replica.delete_prefix(b"foo", &alice).await?;
1875        assert_eq!(deleted, 2);
1876        assert_eq!(
1877            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1878            None
1879        );
1880        assert_eq!(
1881            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1882            None
1883        );
1884        assert_eq!(
1885            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1886            None
1887        );
1888        store.flush()?;
1889        Ok(())
1890    }
1891
1892    #[tokio::test]
1893    async fn test_replica_sync_delete_memory() -> Result<()> {
1894        let alice_store = store::Store::memory();
1895        let bob_store = store::Store::memory();
1896
1897        test_replica_sync_delete(alice_store, bob_store).await
1898    }
1899
1900    #[tokio::test]
1901    #[cfg(feature = "fs-store")]
1902    async fn test_replica_sync_delete_fs() -> Result<()> {
1903        let alice_dbfile = tempfile::NamedTempFile::new()?;
1904        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1905        let bob_dbfile = tempfile::NamedTempFile::new()?;
1906        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1907        test_replica_sync_delete(alice_store, bob_store).await
1908    }
1909
1910    async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1911        let alice_set = ["foot"];
1912        let bob_set = ["fool", "foo", "fog"];
1913
1914        let mut rng = rand::rng();
1915        let author = Author::new(&mut rng);
1916        let myspace = NamespaceSecret::new(&mut rng);
1917        let mut alice = alice_store.new_replica(myspace.clone())?;
1918        for el in &alice_set {
1919            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1920        }
1921
1922        let mut bob = bob_store.new_replica(myspace.clone())?;
1923        for el in &bob_set {
1924            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1925        }
1926
1927        sync(&mut alice, &mut bob).await?;
1928
1929        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1930        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1931        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1932        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1933
1934        let mut alice = alice_store.new_replica(myspace.clone())?;
1935        let mut bob = bob_store.new_replica(myspace.clone())?;
1936        alice.delete_prefix("foo", &author).await?;
1937        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1938            .await?;
1939        sync(&mut alice, &mut bob).await?;
1940        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1941        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1942        alice_store.flush()?;
1943        bob_store.flush()?;
1944        Ok(())
1945    }
1946
1947    #[tokio::test]
1948    async fn test_replica_remove_memory() -> Result<()> {
1949        let alice_store = store::Store::memory();
1950        test_replica_remove(alice_store).await
1951    }
1952
1953    #[tokio::test]
1954    #[cfg(feature = "fs-store")]
1955    async fn test_replica_remove_fs() -> Result<()> {
1956        let alice_dbfile = tempfile::NamedTempFile::new()?;
1957        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1958        test_replica_remove(alice_store).await
1959    }
1960
1961    async fn test_replica_remove(mut store: Store) -> Result<()> {
1962        let mut rng = rand::rng();
1963        let namespace = NamespaceSecret::new(&mut rng);
1964        let author = Author::new(&mut rng);
1965        let mut replica = store.new_replica(namespace.clone())?;
1966
1967        // insert entry
1968        let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1969        let res = store
1970            .get_many(namespace.id(), Query::all())?
1971            .collect::<Vec<_>>();
1972        assert_eq!(res.len(), 1);
1973
1974        // remove replica
1975        let res = store.remove_replica(&namespace.id());
1976        // may not remove replica while still open;
1977        assert!(res.is_err());
1978        store.close_replica(namespace.id());
1979        store.remove_replica(&namespace.id())?;
1980        let res = store
1981            .get_many(namespace.id(), Query::all())?
1982            .collect::<Vec<_>>();
1983        assert_eq!(res.len(), 0);
1984
1985        // may not reopen removed replica
1986        let res = store.load_replica_info(&namespace.id());
1987        assert!(matches!(res, Err(OpenError::NotFound)));
1988
1989        // may recreate replica
1990        let mut replica = store.new_replica(namespace.clone())?;
1991        replica.insert(b"foo", &author, hash, 3).await?;
1992        let res = store
1993            .get_many(namespace.id(), Query::all())?
1994            .collect::<Vec<_>>();
1995        assert_eq!(res.len(), 1);
1996        store.flush()?;
1997        Ok(())
1998    }
1999
2000    #[tokio::test]
2001    async fn test_replica_delete_edge_cases_memory() -> Result<()> {
2002        let store = store::Store::memory();
2003        test_replica_delete_edge_cases(store).await
2004    }
2005
2006    #[tokio::test]
2007    #[cfg(feature = "fs-store")]
2008    async fn test_replica_delete_edge_cases_fs() -> Result<()> {
2009        let dbfile = tempfile::NamedTempFile::new()?;
2010        let store = store::fs::Store::persistent(dbfile.path())?;
2011        test_replica_delete_edge_cases(store).await
2012    }
2013
2014    async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
2015        let mut rng = rand::rng();
2016        let author = Author::new(&mut rng);
2017        let namespace = NamespaceSecret::new(&mut rng);
2018
2019        let edgecases = [0u8, 1u8, 255u8];
2020        let prefixes = [0u8, 255u8];
2021        let hash = Hash::new(b"foo");
2022        let len = 3;
2023        for prefix in prefixes {
2024            let mut expected = vec![];
2025            let mut replica = store.new_replica(namespace.clone())?;
2026            for suffix in edgecases {
2027                let key = [prefix, suffix].to_vec();
2028                expected.push(key.clone());
2029                replica.insert(&key, &author, hash, len).await?;
2030            }
2031            assert_keys(&mut store, namespace.id(), expected);
2032            let mut replica = store.new_replica(namespace.clone())?;
2033            replica.delete_prefix([prefix], &author).await?;
2034            assert_keys(&mut store, namespace.id(), vec![]);
2035        }
2036
2037        let mut replica = store.new_replica(namespace.clone())?;
2038        let key = vec![1u8, 0u8];
2039        replica.insert(key, &author, hash, len).await?;
2040        let key = vec![1u8, 1u8];
2041        replica.insert(key, &author, hash, len).await?;
2042        let key = vec![1u8, 2u8];
2043        replica.insert(key, &author, hash, len).await?;
2044        let prefix = vec![1u8, 1u8];
2045        replica.delete_prefix(prefix, &author).await?;
2046        assert_keys(
2047            &mut store,
2048            namespace.id(),
2049            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2050        );
2051
2052        let mut replica = store.new_replica(namespace.clone())?;
2053        let key = vec![0u8, 255u8];
2054        replica.insert(key, &author, hash, len).await?;
2055        let key = vec![0u8, 0u8];
2056        replica.insert(key, &author, hash, len).await?;
2057        let prefix = vec![0u8];
2058        replica.delete_prefix(prefix, &author).await?;
2059        assert_keys(
2060            &mut store,
2061            namespace.id(),
2062            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2063        );
2064        store.flush()?;
2065        Ok(())
2066    }
2067
2068    #[tokio::test]
2069    async fn test_latest_iter_memory() -> Result<()> {
2070        let store = store::Store::memory();
2071        test_latest_iter(store).await
2072    }
2073
2074    #[tokio::test]
2075    #[cfg(feature = "fs-store")]
2076    async fn test_latest_iter_fs() -> Result<()> {
2077        let dbfile = tempfile::NamedTempFile::new()?;
2078        let store = store::fs::Store::persistent(dbfile.path())?;
2079        test_latest_iter(store).await
2080    }
2081
2082    async fn test_latest_iter(mut store: Store) -> Result<()> {
2083        let mut rng = rand::rng();
2084        let author0 = Author::new(&mut rng);
2085        let author1 = Author::new(&mut rng);
2086        let namespace = NamespaceSecret::new(&mut rng);
2087        let mut replica = store.new_replica(namespace.clone())?;
2088
2089        replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2090        let latest = store
2091            .get_latest_for_each_author(namespace.id())?
2092            .collect::<Result<Vec<_>>>()?;
2093        assert_eq!(latest.len(), 1);
2094        assert_eq!(latest[0].2, b"a0.1".to_vec());
2095
2096        let mut replica = store.new_replica(namespace.clone())?;
2097        replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2098        replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2099        let latest = store
2100            .get_latest_for_each_author(namespace.id())?
2101            .collect::<Result<Vec<_>>>()?;
2102        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2103        latest_keys.sort();
2104        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2105        store.flush()?;
2106        Ok(())
2107    }
2108
2109    #[tokio::test]
2110    async fn test_replica_byte_keys_memory() -> Result<()> {
2111        let store = store::Store::memory();
2112
2113        test_replica_byte_keys(store).await?;
2114        Ok(())
2115    }
2116
2117    #[tokio::test]
2118    #[cfg(feature = "fs-store")]
2119    async fn test_replica_byte_keys_fs() -> Result<()> {
2120        let dbfile = tempfile::NamedTempFile::new()?;
2121        let store = store::fs::Store::persistent(dbfile.path())?;
2122        test_replica_byte_keys(store).await?;
2123
2124        Ok(())
2125    }
2126
2127    async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2128        let mut rng = rand::rng();
2129        let author = Author::new(&mut rng);
2130        let namespace = NamespaceSecret::new(&mut rng);
2131
2132        let hash = Hash::new(b"foo");
2133        let len = 3;
2134
2135        let key = vec![1u8, 0u8];
2136        let mut replica = store.new_replica(namespace.clone())?;
2137        replica.insert(key, &author, hash, len).await?;
2138        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2139        let key = vec![1u8, 2u8];
2140        let mut replica = store.new_replica(namespace.clone())?;
2141        replica.insert(key, &author, hash, len).await?;
2142        assert_keys(
2143            &mut store,
2144            namespace.id(),
2145            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2146        );
2147
2148        let key = vec![0u8, 255u8];
2149        let mut replica = store.new_replica(namespace.clone())?;
2150        replica.insert(key, &author, hash, len).await?;
2151        assert_keys(
2152            &mut store,
2153            namespace.id(),
2154            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2155        );
2156        store.flush()?;
2157        Ok(())
2158    }
2159
2160    #[tokio::test]
2161    async fn test_replica_capability_memory() -> Result<()> {
2162        let store = store::Store::memory();
2163        test_replica_capability(store).await
2164    }
2165
2166    #[tokio::test]
2167    #[cfg(feature = "fs-store")]
2168    async fn test_replica_capability_fs() -> Result<()> {
2169        let dbfile = tempfile::NamedTempFile::new()?;
2170        let store = store::fs::Store::persistent(dbfile.path())?;
2171        test_replica_capability(store).await
2172    }
2173
2174    #[allow(clippy::redundant_pattern_matching)]
2175    async fn test_replica_capability(mut store: Store) -> Result<()> {
2176        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2177        let author = store.new_author(&mut rng)?;
2178        let namespace = NamespaceSecret::new(&mut rng);
2179
2180        // import read capability - insert must fail
2181        let capability = Capability::Read(namespace.id());
2182        store.import_namespace(capability)?;
2183        let mut replica = store.open_replica(&namespace.id())?;
2184        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2185        assert!(matches!(res, Err(InsertError::ReadOnly)));
2186
2187        // import write capability - insert must succeed
2188        let capability = Capability::Write(namespace.clone());
2189        store.import_namespace(capability)?;
2190        let mut replica = store.open_replica(&namespace.id())?;
2191        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2192        assert!(matches!(res, Ok(_)));
2193        store.close_replica(namespace.id());
2194        let mut replica = store.open_replica(&namespace.id())?;
2195        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2196        assert!(res.is_ok());
2197
2198        // import read capability again - insert must still succeed
2199        let capability = Capability::Read(namespace.id());
2200        store.import_namespace(capability)?;
2201        store.close_replica(namespace.id());
2202        let mut replica = store.open_replica(&namespace.id())?;
2203        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2204        assert!(res.is_ok());
2205        store.flush()?;
2206        Ok(())
2207    }
2208
2209    #[tokio::test]
2210    async fn test_actor_capability_memory() -> Result<()> {
2211        let store = store::Store::memory();
2212        test_actor_capability(store).await
2213    }
2214
2215    #[tokio::test]
2216    #[cfg(feature = "fs-store")]
2217    async fn test_actor_capability_fs() -> Result<()> {
2218        let dbfile = tempfile::NamedTempFile::new()?;
2219        let store = store::fs::Store::persistent(dbfile.path())?;
2220        test_actor_capability(store).await
2221    }
2222
2223    async fn test_actor_capability(store: Store) -> Result<()> {
2224        // test with actor
2225        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2226        let author = Author::new(&mut rng);
2227        let handle = SyncHandle::spawn(store, None, "test".into());
2228        let author = handle.import_author(author).await?;
2229        let namespace = NamespaceSecret::new(&mut rng);
2230        let id = namespace.id();
2231
2232        // import read capability - insert must fail
2233        let capability = Capability::Read(namespace.id());
2234        handle.import_namespace(capability).await?;
2235        handle.open(namespace.id(), Default::default()).await?;
2236        let res = handle
2237            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2238            .await;
2239        assert!(res.is_err());
2240
2241        // import write capability - insert must succeed
2242        let capability = Capability::Write(namespace.clone());
2243        handle.import_namespace(capability).await?;
2244        let res = handle
2245            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2246            .await;
2247        assert!(res.is_ok());
2248
2249        // close and reopen - must still succeed
2250        handle.close(namespace.id()).await?;
2251        let res = handle
2252            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2253            .await;
2254        assert!(res.is_err());
2255        handle.open(namespace.id(), Default::default()).await?;
2256        let res = handle
2257            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2258            .await;
2259        assert!(res.is_ok());
2260        Ok(())
2261    }
2262
2263    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2264        let mut res = vec![];
2265        while let Ok(ev) = events.try_recv() {
2266            res.push(ev);
2267        }
2268        res
2269    }
2270
2271    /// This tests that no events are emitted for entries received during sync which are obsolete
2272    /// (too old) by the time they are actually inserted in the store.
2273    #[tokio::test]
2274    async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2275        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2276        let mut store1 = store::Store::memory();
2277        let mut store2 = store::Store::memory();
2278        let peer1 = [1u8; 32];
2279        let peer2 = [2u8; 32];
2280        let mut state1 = SyncOutcome::default();
2281        let mut state2 = SyncOutcome::default();
2282
2283        let author = Author::new(&mut rng);
2284        let namespace = NamespaceSecret::new(&mut rng);
2285        let mut replica1 = store1.new_replica(namespace.clone())?;
2286        let mut replica2 = store2.new_replica(namespace.clone())?;
2287
2288        let (events1_sender, events1) = async_channel::bounded(32);
2289        let (events2_sender, events2) = async_channel::bounded(32);
2290
2291        replica1.info.subscribe(events1_sender);
2292        replica2.info.subscribe(events2_sender);
2293
2294        replica1.hash_and_insert(b"foo", &author, b"init").await?;
2295
2296        let from1 = replica1.sync_initial_message()?;
2297        let from2 = replica2
2298            .sync_process_message(from1, peer1, &mut state2)
2299            .await
2300            .unwrap()
2301            .unwrap();
2302        let from1 = replica1
2303            .sync_process_message(from2, peer2, &mut state1)
2304            .await
2305            .unwrap()
2306            .unwrap();
2307        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2308        // sync is already running. this means the entry from replica1 will be rejected. we make
2309        // sure that no InsertRemote event is emitted for this entry.
2310        replica2.hash_and_insert(b"foo", &author, b"update").await?;
2311        let from2 = replica2
2312            .sync_process_message(from1, peer1, &mut state2)
2313            .await
2314            .unwrap();
2315        assert!(from2.is_none());
2316        let events1 = drain(events1);
2317        let events2 = drain(events2);
2318        assert_eq!(events1.len(), 1);
2319        assert_eq!(events2.len(), 1);
2320        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2321        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2322        assert_eq!(state1.num_sent, 1);
2323        assert_eq!(state1.num_recv, 0);
2324        assert_eq!(state2.num_sent, 0);
2325        assert_eq!(state2.num_recv, 1);
2326        store1.flush()?;
2327        store2.flush()?;
2328        Ok(())
2329    }
2330
2331    #[tokio::test]
2332    async fn test_replica_queries_mem() -> Result<()> {
2333        let store = store::Store::memory();
2334
2335        test_replica_queries(store).await?;
2336        Ok(())
2337    }
2338
2339    #[tokio::test]
2340    #[cfg(feature = "fs-store")]
2341    async fn test_replica_queries_fs() -> Result<()> {
2342        let dbfile = tempfile::NamedTempFile::new()?;
2343        let store = store::fs::Store::persistent(dbfile.path())?;
2344        test_replica_queries(store).await?;
2345
2346        Ok(())
2347    }
2348
2349    async fn test_replica_queries(mut store: Store) -> Result<()> {
2350        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2351        let namespace = NamespaceSecret::new(&mut rng);
2352        let namespace_id = namespace.id();
2353
2354        let a1 = store.new_author(&mut rng)?;
2355        let a2 = store.new_author(&mut rng)?;
2356        let a3 = store.new_author(&mut rng)?;
2357        println!(
2358            "a1 {} a2 {} a3 {}",
2359            a1.id().fmt_short(),
2360            a2.id().fmt_short(),
2361            a3.id().fmt_short()
2362        );
2363
2364        let mut replica = store.new_replica(namespace.clone())?;
2365        replica.hash_and_insert("hi/world", &a2, "a2").await?;
2366        replica.hash_and_insert("hi/world", &a1, "a1").await?;
2367        replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2368        replica.hash_and_insert("hi", &a3, "a3").await?;
2369
2370        struct QueryTester<'a> {
2371            store: &'a mut Store,
2372            namespace: NamespaceId,
2373        }
2374        impl QueryTester<'_> {
2375            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2376                let query = query.into();
2377                let actual = self
2378                    .store
2379                    .get_many(self.namespace, query.clone())
2380                    .unwrap()
2381                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2382                    .collect::<Result<Vec<_>>>()
2383                    .unwrap();
2384                let expected = expected
2385                    .into_iter()
2386                    .map(|(key, author)| (key.to_string(), author.id()))
2387                    .collect::<Vec<_>>();
2388                assert_eq!(actual, expected, "query: {query:#?}")
2389            }
2390        }
2391
2392        let mut qt = QueryTester {
2393            store: &mut store,
2394            namespace: namespace_id,
2395        };
2396
2397        qt.assert(
2398            Query::all(),
2399            vec![
2400                ("hi/world", &a1),
2401                ("hi/moon", &a2),
2402                ("hi/world", &a2),
2403                ("hi", &a3),
2404            ],
2405        );
2406
2407        qt.assert(
2408            Query::single_latest_per_key(),
2409            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2410        );
2411
2412        qt.assert(
2413            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2414            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2415        );
2416
2417        qt.assert(
2418            Query::single_latest_per_key().key_prefix("hi/"),
2419            vec![("hi/moon", &a2), ("hi/world", &a1)],
2420        );
2421
2422        qt.assert(
2423            Query::single_latest_per_key()
2424                .key_prefix("hi/")
2425                .sort_direction(SortDirection::Desc),
2426            vec![("hi/world", &a1), ("hi/moon", &a2)],
2427        );
2428
2429        qt.assert(
2430            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2431            vec![
2432                ("hi", &a3),
2433                ("hi/moon", &a2),
2434                ("hi/world", &a1),
2435                ("hi/world", &a2),
2436            ],
2437        );
2438
2439        qt.assert(
2440            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2441            vec![
2442                ("hi/world", &a2),
2443                ("hi/world", &a1),
2444                ("hi/moon", &a2),
2445                ("hi", &a3),
2446            ],
2447        );
2448
2449        qt.assert(
2450            Query::all().key_prefix("hi/"),
2451            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2452        );
2453
2454        qt.assert(
2455            Query::all().key_prefix("hi/").offset(1).limit(1),
2456            vec![("hi/moon", &a2)],
2457        );
2458
2459        qt.assert(
2460            Query::all()
2461                .key_prefix("hi/")
2462                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2463            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2464        );
2465
2466        qt.assert(
2467            Query::all()
2468                .key_prefix("hi/")
2469                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2470                .offset(1)
2471                .limit(1),
2472            vec![("hi/world", &a1)],
2473        );
2474
2475        qt.assert(
2476            Query::all()
2477                .key_prefix("hi/")
2478                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2479            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2480        );
2481
2482        qt.assert(
2483            Query::all()
2484                .key_prefix("hi/")
2485                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2486            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2487        );
2488
2489        qt.assert(
2490            Query::all()
2491                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2492                .limit(2)
2493                .offset(1),
2494            vec![("hi/moon", &a2), ("hi/world", &a1)],
2495        );
2496
2497        let mut replica = store.new_replica(namespace)?;
2498        replica.delete_prefix("hi/world", &a2).await?;
2499        let mut qt = QueryTester {
2500            store: &mut store,
2501            namespace: namespace_id,
2502        };
2503
2504        qt.assert(
2505            Query::all(),
2506            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2507        );
2508
2509        qt.assert(
2510            Query::all().include_empty(),
2511            vec![
2512                ("hi/world", &a1),
2513                ("hi/moon", &a2),
2514                ("hi/world", &a2),
2515                ("hi", &a3),
2516            ],
2517        );
2518        store.flush()?;
2519        Ok(())
2520    }
2521
2522    #[test]
2523    fn test_dl_policies_mem() -> Result<()> {
2524        let mut store = store::Store::memory();
2525        test_dl_policies(&mut store)
2526    }
2527
2528    #[test]
2529    #[cfg(feature = "fs-store")]
2530    fn test_dl_policies_fs() -> Result<()> {
2531        let dbfile = tempfile::NamedTempFile::new()?;
2532        let mut store = store::fs::Store::persistent(dbfile.path())?;
2533        test_dl_policies(&mut store)
2534    }
2535
2536    fn test_dl_policies(store: &mut Store) -> Result<()> {
2537        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2538        let namespace = NamespaceSecret::new(&mut rng);
2539        let id = namespace.id();
2540
2541        let filter = store::FilterKind::Exact("foo".into());
2542        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2543        store
2544            .set_download_policy(&id, policy.clone())
2545            .expect_err("document dos not exist");
2546
2547        // now create the document
2548        store.new_replica(namespace)?;
2549
2550        store.set_download_policy(&id, policy.clone())?;
2551        let retrieved_policy = store.get_download_policy(&id)?;
2552        assert_eq!(retrieved_policy, policy);
2553        store.flush()?;
2554        Ok(())
2555    }
2556
2557    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2558        expected.sort();
2559        assert_eq!(expected, get_keys_sorted(store, namespace));
2560    }
2561
2562    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2563        let mut res = store
2564            .get_many(namespace, Query::all())
2565            .unwrap()
2566            .map(|e| e.map(|e| e.key().to_vec()))
2567            .collect::<Result<Vec<_>>>()
2568            .unwrap();
2569        res.sort();
2570        res
2571    }
2572
2573    fn get_entry(
2574        store: &mut Store,
2575        namespace: NamespaceId,
2576        author: AuthorId,
2577        key: &[u8],
2578    ) -> anyhow::Result<SignedEntry> {
2579        let entry = store
2580            .get_exact(namespace, author, key, true)?
2581            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2582        Ok(entry)
2583    }
2584
2585    fn get_content_hash(
2586        store: &mut Store,
2587        namespace: NamespaceId,
2588        author: AuthorId,
2589        key: &[u8],
2590    ) -> anyhow::Result<Option<Hash>> {
2591        let hash = store
2592            .get_exact(namespace, author, key, false)?
2593            .map(|e| e.content_hash());
2594        Ok(hash)
2595    }
2596
2597    async fn sync<'a>(
2598        alice: &'a mut Replica<'a>,
2599        bob: &'a mut Replica<'a>,
2600    ) -> Result<(SyncOutcome, SyncOutcome)> {
2601        let alice_peer_id = [1u8; 32];
2602        let bob_peer_id = [2u8; 32];
2603        let mut alice_state = SyncOutcome::default();
2604        let mut bob_state = SyncOutcome::default();
2605        // Sync alice - bob
2606        let mut next_to_bob = Some(alice.sync_initial_message()?);
2607        let mut rounds = 0;
2608        while let Some(msg) = next_to_bob.take() {
2609            assert!(rounds < 100, "too many rounds");
2610            rounds += 1;
2611            println!("round {rounds}");
2612            if let Some(msg) = bob
2613                .sync_process_message(msg, alice_peer_id, &mut bob_state)
2614                .await?
2615            {
2616                next_to_bob = alice
2617                    .sync_process_message(msg, bob_peer_id, &mut alice_state)
2618                    .await?
2619            }
2620        }
2621        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2622        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2623        Ok((alice_state, bob_state))
2624    }
2625
2626    fn check_entries(
2627        store: &mut Store,
2628        namespace: &NamespaceId,
2629        author: &Author,
2630        set: &[&str],
2631    ) -> Result<()> {
2632        for el in set {
2633            store.get_exact(*namespace, author.id(), el, false)?;
2634        }
2635        Ok(())
2636    }
2637
2638    /// Snapshot of the `SignedEntry` postcard wire format used by doc sync.
2639    #[test]
2640    fn test_signed_entry_postcard_snapshot() {
2641        let author = Author::from_bytes(&[0xa1; 32]);
2642        let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2643        let record = Record::new(Hash::EMPTY, 0, 1_700_000_000_000_000u64);
2644        let id = RecordIdentifier::new(namespace.id(), author.id(), b"wire-format-test");
2645        let signed = SignedEntry::from_entry(Entry::new(id, record), &namespace, &author);
2646
2647        let bytes = postcard::to_stdvec(&signed).unwrap();
2648        assert_eq!(
2649            hex::encode(&bytes),
2650            "4b523f1b6d9b00a4779fc9f8f105a9e36f062ceb7d511b632905782042ad30acb6dd07bfced4ecd5f3aa58321e8ace63f48f988ed8461bfdcd8b0e902187a10e228ddc6998329b7faa64875fe80da36406ea8d87e3e57bb048323e9cb66c0b343b60c4e709fb978b878e37d0c362edfc06c8cdc774c8b29d94e48eaa06cca60f5055154f42065ea5a1bea05463826be2684eb92df92c100027aabaae57ca554207bc7cbcb5636375fa1d82434d466724d92377f53b980695dd49d26d0ce12205a5776972652d666f726d61742d7465737400af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f32628080f9c0c1c48203",
2651        );
2652
2653        let decoded: SignedEntry = postcard::from_bytes(&bytes).unwrap();
2654        assert_eq!(decoded, signed);
2655    }
2656
2657    /// Snapshot of the `Author` postcard format.
2658    #[test]
2659    fn test_author_postcard_snapshot() {
2660        let author = Author::from_bytes(&[0xa1; 32]);
2661        let bytes = postcard::to_stdvec(&author).unwrap();
2662        assert_eq!(
2663            hex::encode(&bytes),
2664            "20a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1",
2665        );
2666    }
2667
2668    /// Snapshot of the `NamespaceSecret` postcard format.
2669    #[test]
2670    fn test_namespace_secret_postcard_snapshot() {
2671        let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2672        let bytes = postcard::to_stdvec(&namespace).unwrap();
2673        assert_eq!(
2674            hex::encode(&bytes),
2675            "20b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2",
2676        );
2677    }
2678}