Skip to main content

iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17// `iroh::Signature` is a newtype wrapper around `ed25519_dalek::Signature` with a
18// hand-written, wire-stable `serialize_tuple` serde impl (the same impl every other
19// iroh crate uses for handshake / discovery payloads). Embedding it inside
20// `EntrySignature` — rather than the raw dalek type — keeps the on-wire
21// `SignedEntry` format independent of upstream `ed25519` serde changes.
22use iroh::{KeyParsingError, Signature, SignatureError};
23use iroh_blobs::Hash;
24use n0_future::{
25    time::{Duration, SystemTime},
26    IterExt,
27};
28use serde::{Deserialize, Serialize};
29
30pub use crate::heads::AuthorHeads;
31use crate::{
32    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
33    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
34    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
35};
36
37/// Protocol message for the set reconciliation protocol.
38///
39/// Can be serialized to bytes with [serde] to transfer between peers.
40pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
41
42/// Byte representation of an iroh `EndpointId`.
43// TODO: Consider `iroh::EndpointId` instead of raw bytes (`iroh` re-exports it from `iroh-base`).
44pub type PeerIdBytes = [u8; 32];
45
46/// Max time in the future from our wall clock time that we accept entries for.
47/// Value is 10 minutes.
48pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_micros() as u64;
49
50/// Callback that may be set on a replica to determine the availability status for a content hash.
51pub type ContentStatusCallback =
52    Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
53
54/// Event emitted by sync when entries are added.
55#[derive(derive_more::Debug, Clone)]
56pub enum Event {
57    /// A local entry has been added.
58    LocalInsert {
59        /// Document in which the entry was inserted.
60        namespace: NamespaceId,
61        /// Inserted entry.
62        entry: SignedEntry,
63    },
64    /// A remote entry has been added.
65    RemoteInsert {
66        /// Document in which the entry was inserted.
67        namespace: NamespaceId,
68        /// Inserted entry.
69        entry: SignedEntry,
70        /// Peer that provided the inserted entry.
71        /// Debug matches [`iroh::PublicKey::fmt_short`] (first 5 bytes, lower hex).
72        #[debug("{}", hex::encode(&from[..5]))]
73        from: PeerIdBytes,
74        /// Whether download policies require the content to be downloaded.
75        should_download: bool,
76        /// [`ContentStatus`] for this entry in the remote's replica.
77        remote_content_status: ContentStatus,
78    },
79}
80
81/// Whether an entry was inserted locally or by a remote peer.
82#[derive(derive_more::Debug, Clone)]
83pub enum InsertOrigin {
84    /// The entry was inserted locally.
85    Local,
86    /// The entry was received from the remote node identified by [`PeerIdBytes`].
87    Sync {
88        /// The peer from which we received this entry.
89        /// Debug matches [`iroh::PublicKey::fmt_short`] (first 5 bytes, lower hex).
90        #[debug("{}", hex::encode(&from[..5]))]
91        from: PeerIdBytes,
92        /// Whether the peer claims to have the content blob for this entry.
93        remote_content_status: ContentStatus,
94    },
95}
96
97/// Whether the content status is available on a node.
98#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
99pub enum ContentStatus {
100    /// The content is completely available.
101    Complete,
102    /// The content is partially available.
103    Incomplete,
104    /// The content is missing.
105    Missing,
106}
107
108/// Outcome of a sync operation.
109#[derive(Debug, Clone, Default)]
110pub struct SyncOutcome {
111    /// Timestamp of the latest entry for each author in the set we received.
112    pub heads_received: AuthorHeads,
113    /// Number of entries we received.
114    pub num_recv: usize,
115    /// Number of entries we sent.
116    pub num_sent: usize,
117}
118
119fn get_as_ptr<T>(value: &T) -> Option<usize> {
120    use std::mem;
121    if mem::size_of::<T>() == std::mem::size_of::<usize>()
122        && mem::align_of::<T>() == mem::align_of::<usize>()
123    {
124        // Safe only if size and alignment requirements are met
125        unsafe { Some(mem::transmute_copy(value)) }
126    } else {
127        None
128    }
129}
130
131fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
132    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
133}
134
135#[derive(Debug, Default)]
136struct Subscribers(Vec<async_channel::Sender<Event>>);
137impl Subscribers {
138    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
139        self.0.push(sender)
140    }
141    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
142        self.0.retain(|s| !same_channel(s, sender));
143    }
144    pub async fn send(&mut self, event: Event) {
145        self.0 = std::mem::take(&mut self.0)
146            .into_iter()
147            .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
148            .join_all()
149            .await
150            .into_iter()
151            .flatten()
152            .collect();
153    }
154    pub fn len(&self) -> usize {
155        self.0.len()
156    }
157    pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
158        if !self.0.is_empty() {
159            self.send(f()).await
160        }
161    }
162}
163
164/// Kind of capability of the namespace.
165#[derive(
166    Debug,
167    Clone,
168    Copy,
169    Serialize,
170    Deserialize,
171    num_enum::IntoPrimitive,
172    num_enum::TryFromPrimitive,
173    strum::Display,
174)]
175#[repr(u8)]
176#[strum(serialize_all = "snake_case")]
177pub enum CapabilityKind {
178    /// A writable replica.
179    Write = 1,
180    /// A readable replica.
181    Read = 2,
182}
183
184/// The capability of the namespace.
185#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
186pub enum Capability {
187    /// Write access to the namespace.
188    Write(NamespaceSecret),
189    /// Read only access to the namespace.
190    Read(NamespaceId),
191}
192
193impl Capability {
194    /// Get the [`NamespaceId`] for this [`Capability`].
195    pub fn id(&self) -> NamespaceId {
196        match self {
197            Capability::Write(secret) => secret.id(),
198            Capability::Read(id) => *id,
199        }
200    }
201
202    /// Get the [`NamespaceSecret`] of this [`Capability`].
203    /// Will fail if the [`Capability`] is read only.
204    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
205        match self {
206            Capability::Write(secret) => Ok(secret),
207            Capability::Read(_) => Err(ReadOnly),
208        }
209    }
210
211    /// Get the kind of capability.
212    pub fn kind(&self) -> CapabilityKind {
213        match self {
214            Capability::Write(_) => CapabilityKind::Write,
215            Capability::Read(_) => CapabilityKind::Read,
216        }
217    }
218
219    /// Get the raw representation of this namespace capability.
220    pub fn raw(&self) -> (u8, [u8; 32]) {
221        let capability_repr: u8 = self.kind().into();
222        let bytes = match self {
223            Capability::Write(secret) => secret.to_bytes(),
224            Capability::Read(id) => id.to_bytes(),
225        };
226        (capability_repr, bytes)
227    }
228
229    /// Create a [`Capability`] from its raw representation.
230    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
231        let kind: CapabilityKind = kind.try_into()?;
232        let capability = match kind {
233            CapabilityKind::Write => {
234                let secret = NamespaceSecret::from_bytes(bytes);
235                Capability::Write(secret)
236            }
237            CapabilityKind::Read => {
238                let id = NamespaceId::from(bytes);
239                Capability::Read(id)
240            }
241        };
242        Ok(capability)
243    }
244
245    /// Merge this capability with another capability.
246    ///
247    /// Will return an error if `other` is not a capability for the same namespace.
248    ///
249    /// Returns `true` if the capability was changed, `false` otherwise.
250    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
251        if other.id() != self.id() {
252            return Err(CapabilityError::NamespaceMismatch);
253        }
254
255        // the only capability upgrade is from read-only (self) to writable (other)
256        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
257            let _ = std::mem::replace(self, other);
258            Ok(true)
259        } else {
260            Ok(false)
261        }
262    }
263}
264
265/// Errors for capability operations
266#[derive(Debug, thiserror::Error)]
267pub enum CapabilityError {
268    /// Namespaces are not the same
269    #[error("Namespaces are not the same")]
270    NamespaceMismatch,
271}
272
273/// In memory information about an open replica.
274#[derive(derive_more::Debug)]
275pub struct ReplicaInfo {
276    pub(crate) capability: Capability,
277    subscribers: Subscribers,
278    #[debug("ContentStatusCallback")]
279    content_status_cb: Option<ContentStatusCallback>,
280    closed: bool,
281}
282
283impl ReplicaInfo {
284    /// Create a new replica.
285    pub fn new(capability: Capability) -> Self {
286        Self {
287            capability,
288            subscribers: Default::default(),
289            // on_insert_sender: RwLock::new(None),
290            content_status_cb: None,
291            closed: false,
292        }
293    }
294
295    /// Subscribe to insert events.
296    ///
297    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
298    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
299    /// the receiver to be received from.
300    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
301        self.subscribers.subscribe(sender)
302    }
303
304    /// Explicitly unsubscribe a sender.
305    ///
306    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
307    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
308    /// this replica without having to drop the receiver.
309    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
310        self.subscribers.unsubscribe(sender)
311    }
312
313    /// Get the number of current event subscribers.
314    pub fn subscribers_count(&self) -> usize {
315        self.subscribers.len()
316    }
317
318    /// Set the content status callback.
319    ///
320    /// Only one callback can be active at a time. If a previous callback was registered, this
321    /// will return `false`.
322    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
323        if self.content_status_cb.is_some() {
324            false
325        } else {
326            self.content_status_cb = Some(cb);
327            true
328        }
329    }
330
331    fn ensure_open(&self) -> Result<(), InsertError> {
332        if self.closed() {
333            Err(InsertError::Closed)
334        } else {
335            Ok(())
336        }
337    }
338
339    /// Returns true if the replica is closed.
340    ///
341    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
342    /// manually, it must be closed via [`store::Store::close_replica`] or
343    /// [`store::Store::remove_replica`]
344    pub fn closed(&self) -> bool {
345        self.closed
346    }
347
348    /// Merge a capability.
349    ///
350    /// The capability must refer to the the same namespace, otherwise an error will be returned.
351    ///
352    /// This will upgrade the replica's capability when passing a `Capability::Write`.
353    /// It is a no-op if `capability` is a Capability::Read`.
354    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
355        self.capability.merge(capability)
356    }
357}
358
359/// Local representation of a mutable, synchronizable key-value store.
360#[derive(derive_more::Debug)]
361pub struct Replica<'a, I = Box<ReplicaInfo>> {
362    pub(crate) store: StoreInstance<'a>,
363    pub(crate) info: I,
364}
365
366impl<'a, I> Replica<'a, I>
367where
368    I: Deref<Target = ReplicaInfo> + DerefMut,
369{
370    /// Create a new replica.
371    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
372        Replica { info, store }
373    }
374
375    /// Insert a new record at the given key.
376    ///
377    /// The entry will by signed by the provided `author`.
378    /// The `len` must be the byte length of the data identified by `hash`.
379    ///
380    /// Returns the number of entries removed as a consequence of this insertion,
381    /// or an error either if the entry failed to validate or if a store operation failed.
382    pub async fn insert(
383        &mut self,
384        key: impl AsRef<[u8]>,
385        author: &Author,
386        hash: Hash,
387        len: u64,
388    ) -> Result<usize, InsertError> {
389        if len == 0 || hash == Hash::EMPTY {
390            return Err(InsertError::EntryIsEmpty);
391        }
392        self.info.ensure_open()?;
393        let id = RecordIdentifier::new(self.id(), author.id(), key);
394        let record = Record::new_current(hash, len);
395        let entry = Entry::new(id, record);
396        let secret = self.secret_key()?;
397        let signed_entry = entry.sign(secret, author);
398        self.insert_entry(signed_entry, InsertOrigin::Local).await
399    }
400
401    /// Delete entries that match the given `author` and key `prefix`.
402    ///
403    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
404    /// entries whose key starts with or is equal to the given `prefix`.
405    ///
406    /// Returns the number of entries deleted.
407    pub async fn delete_prefix(
408        &mut self,
409        prefix: impl AsRef<[u8]>,
410        author: &Author,
411    ) -> Result<usize, InsertError> {
412        self.info.ensure_open()?;
413        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
414        let entry = Entry::new_empty(id);
415        let signed_entry = entry.sign(self.secret_key()?, author);
416        self.insert_entry(signed_entry, InsertOrigin::Local).await
417    }
418
419    /// Insert an entry into this replica which was received from a remote peer.
420    ///
421    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
422    /// event, and insert the entry into the replica store.
423    ///
424    /// Returns the number of entries removed as a consequence of this insertion,
425    /// or an error if the entry failed to validate or if a store operation failed.
426    pub async fn insert_remote_entry(
427        &mut self,
428        entry: SignedEntry,
429        received_from: PeerIdBytes,
430        content_status: ContentStatus,
431    ) -> Result<usize, InsertError> {
432        self.info.ensure_open()?;
433        entry.validate_empty()?;
434        let origin = InsertOrigin::Sync {
435            from: received_from,
436            remote_content_status: content_status,
437        };
438        self.insert_entry(entry, origin).await
439    }
440
441    /// Insert a signed entry into the database.
442    ///
443    /// Returns the number of entries removed as a consequence of this insertion.
444    async fn insert_entry(
445        &mut self,
446        entry: SignedEntry,
447        origin: InsertOrigin,
448    ) -> Result<usize, InsertError> {
449        let namespace = self.id();
450
451        let store = &self.store;
452        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
453
454        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
455        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
456
457        let removed_count = match outcome {
458            InsertOutcome::Inserted { removed } => removed,
459            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
460        };
461
462        let insert_event = match origin {
463            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
464            InsertOrigin::Sync {
465                from,
466                remote_content_status,
467            } => {
468                let download_policy = self
469                    .store
470                    .get_download_policy(&self.id())
471                    .unwrap_or_default();
472                let should_download = download_policy.matches(entry.entry());
473                Event::RemoteInsert {
474                    namespace,
475                    entry,
476                    from,
477                    should_download,
478                    remote_content_status,
479                }
480            }
481        };
482
483        self.info.subscribers.send(insert_event).await;
484
485        Ok(removed_count)
486    }
487
488    /// Hashes the given data and inserts it.
489    ///
490    /// This does not store the content, just the record of it.
491    /// Returns the calculated hash.
492    pub async fn hash_and_insert(
493        &mut self,
494        key: impl AsRef<[u8]>,
495        author: &Author,
496        data: impl AsRef<[u8]>,
497    ) -> Result<Hash, InsertError> {
498        self.info.ensure_open()?;
499        let len = data.as_ref().len() as u64;
500        let hash = Hash::new(data);
501        self.insert(key, author, hash, len).await?;
502        Ok(hash)
503    }
504
505    /// Get the identifier for an entry in this replica.
506    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
507        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
508    }
509
510    /// Create the initial message for the set reconciliation flow with a remote peer.
511    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
512        self.info.ensure_open().map_err(anyhow::Error::from)?;
513        self.store.initial_message()
514    }
515
516    /// Process a set reconciliation message from a remote peer.
517    ///
518    /// Returns the next message to be sent to the peer, if any.
519    pub async fn sync_process_message(
520        &mut self,
521        message: crate::ranger::Message<SignedEntry>,
522        from_peer: PeerIdBytes,
523        state: &mut SyncOutcome,
524    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
525        self.info.ensure_open()?;
526        let my_namespace = self.id();
527        let now = system_time_now();
528
529        // update state with incoming data.
530        state.num_recv += message.value_count();
531        for (entry, _content_status) in message.values() {
532            state
533                .heads_received
534                .insert(entry.author(), entry.timestamp());
535        }
536
537        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
538        // l
539        let cb = self.info.content_status_cb.clone();
540        let download_policy = self
541            .store
542            .get_download_policy(&my_namespace)
543            .unwrap_or_default();
544        let reply = self
545            .store
546            .process_message(
547                &Default::default(),
548                message,
549                // validate callback: validate incoming entries, and send to on_insert channel
550                |store, entry, content_status| {
551                    let origin = InsertOrigin::Sync {
552                        from: from_peer,
553                        remote_content_status: content_status,
554                    };
555                    validate_entry(now, store, my_namespace, entry, &origin).is_ok()
556                },
557                // on_insert callback: is called when an entry was actually inserted in the store
558                async |_store, entry, content_status| {
559                    // We use `send_with` to only clone the entry if we have active subscriptions.
560                    self.info
561                        .subscribers
562                        .send_with(|| {
563                            let should_download = download_policy.matches(entry.entry());
564                            Event::RemoteInsert {
565                                from: from_peer,
566                                namespace: my_namespace,
567                                entry: entry.clone(),
568                                should_download,
569                                remote_content_status: content_status,
570                            }
571                        })
572                        .await
573                },
574                // content_status callback: get content status for outgoing entries
575                async move |entry| {
576                    if let Some(cb) = cb.as_ref() {
577                        cb(entry.content_hash()).await
578                    } else {
579                        ContentStatus::Missing
580                    }
581                },
582            )
583            .await?;
584
585        // update state with outgoing data.
586        if let Some(ref reply) = reply {
587            state.num_sent += reply.value_count();
588        }
589
590        Ok(reply)
591    }
592
593    /// Get the namespace identifier for this [`Replica`].
594    pub fn id(&self) -> NamespaceId {
595        self.info.capability.id()
596    }
597
598    /// Get the [`Capability`] of this [`Replica`].
599    pub fn capability(&self) -> &Capability {
600        &self.info.capability
601    }
602
603    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
604    /// the replica is read only
605    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
606        self.info.capability.secret_key()
607    }
608}
609
610/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
611#[derive(Debug, thiserror::Error)]
612#[error("Replica allows read access only.")]
613pub struct ReadOnly;
614
615/// Validate a [`SignedEntry`] if it's fit to be inserted.
616///
617/// This validates that
618/// * the entry's author and namespace signatures are correct
619/// * the entry's namespace matches the current replica
620/// * the entry's timestamp is not more than 10 minutes in the future of our system time
621/// * the entry is newer than an existing entry for the same key and author, if such exists.
622fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
623    now: u64,
624    store: &S,
625    expected_namespace: NamespaceId,
626    entry: &SignedEntry,
627    origin: &InsertOrigin,
628) -> Result<(), ValidationFailure> {
629    // Verify the namespace
630    if entry.namespace() != expected_namespace {
631        return Err(ValidationFailure::InvalidNamespace);
632    }
633
634    // Verify signature for non-local entries.
635    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
636        return Err(ValidationFailure::BadSignature);
637    }
638
639    // Verify that the timestamp of the entry is not too far in the future.
640    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
641        return Err(ValidationFailure::TooFarInTheFuture);
642    }
643    Ok(())
644}
645
646/// Error emitted when inserting entries into a [`Replica`] failed
647#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
648pub enum InsertError {
649    /// Storage error
650    #[error("storage error")]
651    Store(anyhow::Error),
652    /// Validation failure
653    #[error("validation failure")]
654    Validation(#[from] ValidationFailure),
655    /// A newer entry exists for either this entry's key or a prefix of the key.
656    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
657    NewerEntryExists,
658    /// Attempted to insert an empty entry.
659    #[error("Attempted to insert an empty entry")]
660    EntryIsEmpty,
661    /// Replica is read only.
662    #[error("Attempted to insert to read only replica")]
663    #[from(ReadOnly)]
664    ReadOnly,
665    /// The replica is closed, no operations may be performed.
666    #[error("replica is closed")]
667    Closed,
668}
669
670/// Reason why verifying a [`SignedEntry`] failed.
671#[derive(thiserror::Error, Debug)]
672pub enum SignedEntryVerifyError {
673    /// One of the entry's public key bytes is not a valid ed25519 curve point.
674    #[error(transparent)]
675    KeyParsing(#[from] KeyParsingError),
676    /// One of the entry's signatures failed verification against the recovered key.
677    #[error(transparent)]
678    Signature(#[from] SignatureError),
679}
680
681/// Reason why entry validation failed
682#[derive(thiserror::Error, Debug)]
683pub enum ValidationFailure {
684    /// Entry namespace does not match the current replica.
685    #[error("Entry namespace does not match the current replica")]
686    InvalidNamespace,
687    /// Entry signature is invalid.
688    #[error("Entry signature is invalid")]
689    BadSignature,
690    /// Entry timestamp is too far in the future.
691    #[error("Entry timestamp is too far in the future.")]
692    TooFarInTheFuture,
693    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
694    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
695    InvalidEmptyEntry,
696}
697
698/// A signed entry.
699#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
700pub struct SignedEntry {
701    signature: EntrySignature,
702    entry: Entry,
703}
704
705impl From<SignedEntry> for Entry {
706    fn from(value: SignedEntry) -> Self {
707        value.entry
708    }
709}
710
711impl PartialOrd for SignedEntry {
712    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
713        Some(self.cmp(other))
714    }
715}
716
717impl Ord for SignedEntry {
718    fn cmp(&self, other: &Self) -> Ordering {
719        self.entry.cmp(&other.entry)
720    }
721}
722
723impl PartialOrd for Entry {
724    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
725        Some(self.cmp(other))
726    }
727}
728
729impl Ord for Entry {
730    fn cmp(&self, other: &Self) -> Ordering {
731        self.id
732            .cmp(&other.id)
733            .then_with(|| self.record.cmp(&other.record))
734    }
735}
736
737impl SignedEntry {
738    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
739        SignedEntry { signature, entry }
740    }
741
742    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
743    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
744        let signature = EntrySignature::from_entry(&entry, namespace, author);
745        SignedEntry { signature, entry }
746    }
747
748    /// Create a new signed entries from its parts.
749    pub fn from_parts(
750        namespace: &NamespaceSecret,
751        author: &Author,
752        key: impl AsRef<[u8]>,
753        record: Record,
754    ) -> Self {
755        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
756        let entry = Entry::new(id, record);
757        Self::from_entry(entry, namespace, author)
758    }
759
760    /// Verify the signatures on this entry.
761    pub fn verify<S: store::PublicKeyStore>(
762        &self,
763        store: &S,
764    ) -> Result<(), SignedEntryVerifyError> {
765        self.signature.verify(
766            &self.entry,
767            &self.entry.namespace().public_key(store)?,
768            &self.entry.author().public_key(store)?,
769        )?;
770        Ok(())
771    }
772
773    /// Get the signature.
774    pub fn signature(&self) -> &EntrySignature {
775        &self.signature
776    }
777
778    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
779    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
780        self.entry().validate_empty()
781    }
782
783    /// Get the [`Entry`].
784    pub fn entry(&self) -> &Entry {
785        &self.entry
786    }
787
788    /// Get the content [`struct@Hash`] of the entry.
789    pub fn content_hash(&self) -> Hash {
790        self.entry().content_hash()
791    }
792
793    /// Get the content length of the entry.
794    pub fn content_len(&self) -> u64 {
795        self.entry().content_len()
796    }
797
798    /// Get the author bytes of this entry.
799    pub fn author_bytes(&self) -> AuthorId {
800        self.entry().id().author()
801    }
802
803    /// Get the key of the entry.
804    pub fn key(&self) -> &[u8] {
805        self.entry().id().key()
806    }
807
808    /// Get the timestamp of the entry.
809    pub fn timestamp(&self) -> u64 {
810        self.entry().timestamp()
811    }
812}
813
814impl RangeEntry for SignedEntry {
815    type Key = RecordIdentifier;
816    type Value = Record;
817
818    fn key(&self) -> &Self::Key {
819        &self.entry.id
820    }
821
822    fn value(&self) -> &Self::Value {
823        &self.entry.record
824    }
825
826    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
827        let mut hasher = blake3::Hasher::new();
828        hasher.update(self.namespace().as_ref());
829        hasher.update(self.author_bytes().as_ref());
830        hasher.update(self.key());
831        hasher.update(&self.timestamp().to_be_bytes());
832        hasher.update(self.content_hash().as_bytes());
833        Fingerprint(hasher.finalize().into())
834    }
835}
836
837/// Signature over an entry.
838#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
839pub struct EntrySignature {
840    author_signature: Signature,
841    namespace_signature: Signature,
842}
843
844impl Debug for EntrySignature {
845    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
846        f.debug_struct("EntrySignature")
847            .field(
848                "namespace_signature",
849                &hex::encode(self.namespace_signature.to_bytes()),
850            )
851            .field(
852                "author_signature",
853                &hex::encode(self.author_signature.to_bytes()),
854            )
855            .finish()
856    }
857}
858
859impl EntrySignature {
860    /// Create a new signature by signing an entry with the `namespace` and `author`.
861    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
862        // TODO: this should probably include a namespace prefix
863        // namespace in the cryptographic sense.
864        let bytes = entry.to_vec();
865        let namespace_signature = Signature::from_bytes(&namespace.sign(&bytes).to_bytes());
866        let author_signature = Signature::from_bytes(&author.sign(&bytes).to_bytes());
867
868        EntrySignature {
869            author_signature,
870            namespace_signature,
871        }
872    }
873
874    /// Verify that this signature was created by signing the `entry` with the
875    /// secret keys of the specified `author` and `namespace`.
876    pub fn verify(
877        &self,
878        entry: &Entry,
879        namespace: &NamespacePublicKey,
880        author: &AuthorPublicKey,
881    ) -> Result<(), SignatureError> {
882        let bytes = entry.to_vec();
883        namespace.verify(&bytes, &self.namespace_signature)?;
884        author.verify(&bytes, &self.author_signature)?;
885
886        Ok(())
887    }
888
889    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
890        let namespace_signature = Signature::from_bytes(namespace_sig);
891        let author_signature = Signature::from_bytes(author_sig);
892
893        EntrySignature {
894            author_signature,
895            namespace_signature,
896        }
897    }
898
899    pub(crate) fn author(&self) -> &Signature {
900        &self.author_signature
901    }
902
903    pub(crate) fn namespace(&self) -> &Signature {
904        &self.namespace_signature
905    }
906}
907
908/// A single entry in a [`Replica`]
909///
910/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
911/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
912/// of the entry's content data, the size of this content data, and a timestamp.
913#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
914pub struct Entry {
915    id: RecordIdentifier,
916    record: Record,
917}
918
919impl Entry {
920    /// Create a new entry
921    pub fn new(id: RecordIdentifier, record: Record) -> Self {
922        Entry { id, record }
923    }
924
925    /// Create a new empty entry with the current timestamp.
926    pub fn new_empty(id: RecordIdentifier) -> Self {
927        Entry {
928            id,
929            record: Record::empty_current(),
930        }
931    }
932
933    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
934    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
935        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
936            (true, true) => Ok(()),
937            (false, false) => Ok(()),
938            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
939            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
940        }
941    }
942
943    /// Get the [`RecordIdentifier`] for this entry.
944    pub fn id(&self) -> &RecordIdentifier {
945        &self.id
946    }
947
948    /// Get the [`NamespaceId`] of this entry.
949    pub fn namespace(&self) -> NamespaceId {
950        self.id.namespace()
951    }
952
953    /// Get the [`AuthorId`] of this entry.
954    pub fn author(&self) -> AuthorId {
955        self.id.author()
956    }
957
958    /// Get the key of this entry.
959    pub fn key(&self) -> &[u8] {
960        self.id.key()
961    }
962
963    /// Get the [`Record`] contained in this entry.
964    pub fn record(&self) -> &Record {
965        &self.record
966    }
967
968    /// Get the content hash of the record.
969    pub fn content_hash(&self) -> Hash {
970        self.record.hash
971    }
972
973    /// Get the content length of the record.
974    pub fn content_len(&self) -> u64 {
975        self.record.len
976    }
977
978    /// Get the timestamp of the record.
979    pub fn timestamp(&self) -> u64 {
980        self.record.timestamp
981    }
982
983    /// Serialize this entry into its canonical byte representation used for signing.
984    pub fn encode(&self, out: &mut Vec<u8>) {
985        self.id.encode(out);
986        self.record.encode(out);
987    }
988
989    /// Serialize this entry into a new vector with its canonical byte representation.
990    pub fn to_vec(&self) -> Vec<u8> {
991        let mut out = Vec::new();
992        self.encode(&mut out);
993        out
994    }
995
996    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
997    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
998        SignedEntry::from_entry(self, namespace, author)
999    }
1000}
1001
1002const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
1003const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
1004const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
1005
1006/// The identifier of a record.
1007#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
1008pub struct RecordIdentifier(Bytes);
1009
1010impl Default for RecordIdentifier {
1011    fn default() -> Self {
1012        Self::new(NamespaceId::default(), AuthorId::default(), b"")
1013    }
1014}
1015
1016impl Debug for RecordIdentifier {
1017    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1018        f.debug_struct("RecordIdentifier")
1019            .field("namespace", &self.namespace())
1020            .field("author", &self.author())
1021            .field("key", &std::string::String::from_utf8_lossy(self.key()))
1022            .finish()
1023    }
1024}
1025
1026impl RangeKey for RecordIdentifier {
1027    #[cfg(test)]
1028    fn is_prefix_of(&self, other: &Self) -> bool {
1029        other.as_ref().starts_with(self.as_ref())
1030    }
1031}
1032
1033fn system_time_now() -> u64 {
1034    SystemTime::now()
1035        .duration_since(SystemTime::UNIX_EPOCH)
1036        .expect("time drift")
1037        .as_micros() as u64
1038}
1039
1040impl RecordIdentifier {
1041    /// Create a new [`RecordIdentifier`].
1042    pub fn new(
1043        namespace: impl Into<NamespaceId>,
1044        author: impl Into<AuthorId>,
1045        key: impl AsRef<[u8]>,
1046    ) -> Self {
1047        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1048        bytes.extend_from_slice(namespace.into().as_bytes());
1049        bytes.extend_from_slice(author.into().as_bytes());
1050        bytes.extend_from_slice(key.as_ref());
1051        Self(bytes.freeze())
1052    }
1053
1054    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1055    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1056        out.extend_from_slice(&self.0);
1057    }
1058
1059    /// Get this [`RecordIdentifier`] as [Bytes].
1060    pub fn as_bytes(&self) -> Bytes {
1061        self.0.clone()
1062    }
1063
1064    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1065    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1066        (
1067            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1068            self.0[AUTHOR_BYTES].try_into().unwrap(),
1069            &self.0[KEY_BYTES],
1070        )
1071    }
1072
1073    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1074    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1075        (
1076            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1077            self.0[AUTHOR_BYTES].try_into().unwrap(),
1078            self.0.slice(KEY_BYTES),
1079        )
1080    }
1081
1082    /// Get the key of this record.
1083    pub fn key(&self) -> &[u8] {
1084        &self.0[KEY_BYTES]
1085    }
1086
1087    /// Get the key of this record as [`Bytes`].
1088    pub fn key_bytes(&self) -> Bytes {
1089        self.0.slice(KEY_BYTES)
1090    }
1091
1092    /// Get the [`NamespaceId`] of this record as byte array.
1093    pub fn namespace(&self) -> NamespaceId {
1094        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1095        value.into()
1096    }
1097
1098    /// Get the [`AuthorId`] of this record as byte array.
1099    pub fn author(&self) -> AuthorId {
1100        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1101        value.into()
1102    }
1103}
1104
1105impl AsRef<[u8]> for RecordIdentifier {
1106    fn as_ref(&self) -> &[u8] {
1107        &self.0
1108    }
1109}
1110
1111impl Deref for SignedEntry {
1112    type Target = Entry;
1113    fn deref(&self) -> &Self::Target {
1114        &self.entry
1115    }
1116}
1117
1118impl Deref for Entry {
1119    type Target = Record;
1120    fn deref(&self) -> &Self::Target {
1121        &self.record
1122    }
1123}
1124
1125/// The data part of an entry in a [`Replica`].
1126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1127pub struct Record {
1128    /// Length of the data referenced by `hash`.
1129    len: u64,
1130    /// Hash of the content data.
1131    hash: Hash,
1132    /// Record creation timestamp. Counted as micros since the Unix epoch.
1133    timestamp: u64,
1134}
1135
1136impl RangeValue for Record {}
1137
1138/// Ordering for entry values.
1139///
1140/// Compares first the timestamp, then the content hash.
1141impl Ord for Record {
1142    fn cmp(&self, other: &Self) -> Ordering {
1143        self.timestamp
1144            .cmp(&other.timestamp)
1145            .then_with(|| self.hash.cmp(&other.hash))
1146    }
1147}
1148
1149impl PartialOrd for Record {
1150    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1151        Some(self.cmp(other))
1152    }
1153}
1154
1155impl Record {
1156    /// Create a new record.
1157    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1158        debug_assert!(
1159            len != 0 || hash == Hash::EMPTY,
1160            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1161        );
1162        Record {
1163            hash,
1164            len,
1165            timestamp,
1166        }
1167    }
1168
1169    /// Create a tombstone record (empty content)
1170    pub fn empty(timestamp: u64) -> Self {
1171        Self::new(Hash::EMPTY, 0, timestamp)
1172    }
1173
1174    /// Create a tombstone record with the timestamp set to now.
1175    pub fn empty_current() -> Self {
1176        Self::new_current(Hash::EMPTY, 0)
1177    }
1178
1179    /// Return `true` if the entry is empty.
1180    pub fn is_empty(&self) -> bool {
1181        self.hash == Hash::EMPTY
1182    }
1183
1184    /// Create a new [`Record`] with the timestamp set to now.
1185    pub fn new_current(hash: Hash, len: u64) -> Self {
1186        let timestamp = system_time_now();
1187        Self::new(hash, len, timestamp)
1188    }
1189
1190    /// Get the length of the data addressed by this record's content hash.
1191    pub fn content_len(&self) -> u64 {
1192        self.len
1193    }
1194
1195    /// Get the [`struct@Hash`] of the content data of this record.
1196    pub fn content_hash(&self) -> Hash {
1197        self.hash
1198    }
1199
1200    /// Get the timestamp of this record.
1201    pub fn timestamp(&self) -> u64 {
1202        self.timestamp
1203    }
1204
1205    #[cfg(test)]
1206    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1207        let len = data.as_ref().len() as u64;
1208        let hash = Hash::new(data);
1209        Self::new_current(hash, len)
1210    }
1211
1212    #[cfg(test)]
1213    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1214        let len = data.as_ref().len() as u64;
1215        let hash = Hash::new(data);
1216        Self::new(hash, len, timestamp)
1217    }
1218
1219    /// Serialize this record into a mutable byte array.
1220    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1221        out.extend_from_slice(&self.len.to_be_bytes());
1222        out.extend_from_slice(self.hash.as_ref());
1223        out.extend_from_slice(&self.timestamp.to_be_bytes())
1224    }
1225}
1226
1227#[cfg(test)]
1228mod tests {
1229    use std::collections::HashSet;
1230
1231    use anyhow::Result;
1232    use rand::SeedableRng;
1233
1234    use super::*;
1235    use crate::{
1236        actor::SyncHandle,
1237        ranger::{Range, Store as _},
1238        store::{OpenError, Query, SortBy, SortDirection, Store},
1239    };
1240
1241    #[tokio::test]
1242    async fn test_basics_memory() -> Result<()> {
1243        let store = store::Store::memory();
1244        test_basics(store).await?;
1245
1246        Ok(())
1247    }
1248
1249    #[tokio::test]
1250    #[cfg(feature = "fs-store")]
1251    async fn test_basics_fs() -> Result<()> {
1252        let dbfile = tempfile::NamedTempFile::new()?;
1253        let store = store::fs::Store::persistent(dbfile.path())?;
1254        test_basics(store).await?;
1255        Ok(())
1256    }
1257
1258    async fn test_basics(mut store: Store) -> Result<()> {
1259        let mut rng = rand::rng();
1260        let alice = Author::new(&mut rng);
1261        let bob = Author::new(&mut rng);
1262        let myspace = NamespaceSecret::new(&mut rng);
1263
1264        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1265        let record = Record::current_from_data(b"this is my cool data");
1266        let entry = Entry::new(record_id, record);
1267        let signed_entry = entry.sign(&myspace, &alice);
1268        signed_entry.verify(&()).expect("failed to verify");
1269
1270        let mut my_replica = store.new_replica(myspace.clone())?;
1271        for i in 0..10 {
1272            my_replica
1273                .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1274                .await?;
1275        }
1276
1277        for i in 0..10 {
1278            let res = store
1279                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1280                .unwrap();
1281            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1282            assert_eq!(res.entry().record().content_len(), len);
1283            res.verify(&())?;
1284        }
1285
1286        // Test multiple records for the same key
1287        let mut my_replica = store.new_replica(myspace.clone())?;
1288        my_replica
1289            .hash_and_insert("/cool/path", &alice, "round 1")
1290            .await?;
1291        let _entry = store
1292            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1293            .unwrap();
1294        // Second
1295        let mut my_replica = store.new_replica(myspace.clone())?;
1296        my_replica
1297            .hash_and_insert("/cool/path", &alice, "round 2")
1298            .await?;
1299        let _entry = store
1300            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1301            .unwrap();
1302
1303        // Get All by author
1304        let entries: Vec<_> = store
1305            .get_many(myspace.id(), Query::author(alice.id()))?
1306            .collect::<Result<_>>()?;
1307        assert_eq!(entries.len(), 11);
1308
1309        // Get All by author
1310        let entries: Vec<_> = store
1311            .get_many(myspace.id(), Query::author(bob.id()))?
1312            .collect::<Result<_>>()?;
1313        assert_eq!(entries.len(), 0);
1314
1315        // Get All by key
1316        let entries: Vec<_> = store
1317            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1318            .collect::<Result<_>>()?;
1319        assert_eq!(entries.len(), 1);
1320
1321        // Get All
1322        let entries: Vec<_> = store
1323            .get_many(myspace.id(), Query::all())?
1324            .collect::<Result<_>>()?;
1325        assert_eq!(entries.len(), 11);
1326
1327        // insert record from different author
1328        let mut my_replica = store.new_replica(myspace.clone())?;
1329        let _entry = my_replica
1330            .hash_and_insert("/cool/path", &bob, "bob round 1")
1331            .await?;
1332
1333        // Get All by author
1334        let entries: Vec<_> = store
1335            .get_many(myspace.id(), Query::author(alice.id()))?
1336            .collect::<Result<_>>()?;
1337        assert_eq!(entries.len(), 11);
1338
1339        let entries: Vec<_> = store
1340            .get_many(myspace.id(), Query::author(bob.id()))?
1341            .collect::<Result<_>>()?;
1342        assert_eq!(entries.len(), 1);
1343
1344        // Get All by key
1345        let entries: Vec<_> = store
1346            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1347            .collect::<Result<_>>()?;
1348        assert_eq!(entries.len(), 2);
1349
1350        // Get all by prefix
1351        let entries: Vec<_> = store
1352            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1353            .collect::<Result<_>>()?;
1354        assert_eq!(entries.len(), 2);
1355
1356        // Get All by author and prefix
1357        let entries: Vec<_> = store
1358            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1359            .collect::<Result<_>>()?;
1360        assert_eq!(entries.len(), 1);
1361
1362        let entries: Vec<_> = store
1363            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1364            .collect::<Result<_>>()?;
1365        assert_eq!(entries.len(), 1);
1366
1367        // Get All
1368        let entries: Vec<_> = store
1369            .get_many(myspace.id(), Query::all())?
1370            .collect::<Result<_>>()?;
1371        assert_eq!(entries.len(), 12);
1372
1373        // Get Range of all should return all latest
1374        let mut my_replica = store.new_replica(myspace.clone())?;
1375        let entries_second: Vec<_> = my_replica
1376            .store
1377            .get_range(Range::new(
1378                RecordIdentifier::default(),
1379                RecordIdentifier::default(),
1380            ))?
1381            .collect::<Result<_, _>>()?;
1382
1383        assert_eq!(entries_second.len(), 12);
1384        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1385
1386        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1387        store.flush()?;
1388        Ok(())
1389    }
1390
1391    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1392    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1393    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1394        /// Helper to verify the store returns the expected peers for the namespace.
1395        #[track_caller]
1396        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1397            assert_eq!(
1398                expected_peers,
1399                &store
1400                    .get_sync_peers(&namespace)
1401                    .unwrap()
1402                    .unwrap()
1403                    .collect::<Vec<_>>(),
1404                "sync peers differ"
1405            );
1406        }
1407
1408        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1409        // expected peers: newest peers are to the front, oldest to the back
1410        let mut expected_peers = Vec::with_capacity(count);
1411        for i in 0..count as u8 {
1412            let peer = [i; 32];
1413            expected_peers.insert(0, peer);
1414            store.register_useful_peer(namespace, peer)?;
1415        }
1416        verify_peers(store, namespace, &expected_peers);
1417
1418        // one more peer should evict the last peer
1419        expected_peers.pop();
1420        let newer_peer = [count as u8; 32];
1421        expected_peers.insert(0, newer_peer);
1422        store.register_useful_peer(namespace, newer_peer)?;
1423        verify_peers(store, namespace, &expected_peers);
1424
1425        // move one existing peer up
1426        let refreshed_peer = expected_peers.remove(2);
1427        expected_peers.insert(0, refreshed_peer);
1428        store.register_useful_peer(namespace, refreshed_peer)?;
1429        verify_peers(store, namespace, &expected_peers);
1430        Ok(())
1431    }
1432
1433    #[tokio::test]
1434    async fn test_content_hashes_iterator_memory() -> Result<()> {
1435        let store = store::Store::memory();
1436        test_content_hashes_iterator(store).await
1437    }
1438
1439    #[tokio::test]
1440    #[cfg(feature = "fs-store")]
1441    async fn test_content_hashes_iterator_fs() -> Result<()> {
1442        let dbfile = tempfile::NamedTempFile::new()?;
1443        let store = store::fs::Store::persistent(dbfile.path())?;
1444        test_content_hashes_iterator(store).await
1445    }
1446
1447    async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1448        let mut rng = rand::rng();
1449        let mut expected = HashSet::new();
1450        let n_replicas = 3;
1451        let n_entries = 4;
1452        for i in 0..n_replicas {
1453            let namespace = NamespaceSecret::new(&mut rng);
1454            let author = store.new_author(&mut rng)?;
1455            let mut replica = store.new_replica(namespace)?;
1456            for j in 0..n_entries {
1457                let key = format!("{j}");
1458                let data = format!("{i}:{j}");
1459                let hash = replica.hash_and_insert(key, &author, data).await?;
1460                expected.insert(hash);
1461            }
1462        }
1463        assert_eq!(expected.len(), n_replicas * n_entries);
1464        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1465        assert_eq!(actual, expected);
1466        Ok(())
1467    }
1468
1469    #[test]
1470    fn test_multikey() {
1471        let mut rng = rand::rng();
1472
1473        let k = ["a", "c", "z"];
1474
1475        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1476        n.sort_by_key(|n| n.id());
1477
1478        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1479        a.sort_by_key(|a| a.id());
1480
1481        // Just key
1482        {
1483            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1484            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1485            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1486
1487            let range = Range::new(ri0.clone(), ri2.clone());
1488            assert!(range.contains(&ri0), "start");
1489            assert!(range.contains(&ri1), "inside");
1490            assert!(!range.contains(&ri2), "end");
1491
1492            assert!(ri0 < ri1);
1493            assert!(ri1 < ri2);
1494        }
1495
1496        // Just namespace
1497        {
1498            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1499            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1500            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1501
1502            let range = Range::new(ri0.clone(), ri2.clone());
1503            assert!(range.contains(&ri0), "start");
1504            assert!(range.contains(&ri1), "inside");
1505            assert!(!range.contains(&ri2), "end");
1506
1507            assert!(ri0 < ri1);
1508            assert!(ri1 < ri2);
1509        }
1510
1511        // Just author
1512        {
1513            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1514            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1515            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1516
1517            let range = Range::new(ri0.clone(), ri2.clone());
1518            assert!(range.contains(&ri0), "start");
1519            assert!(range.contains(&ri1), "inside");
1520            assert!(!range.contains(&ri2), "end");
1521
1522            assert!(ri0 < ri1);
1523            assert!(ri1 < ri2);
1524        }
1525
1526        // Just key and namespace
1527        {
1528            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1529            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1530            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1531
1532            let range = Range::new(ri0.clone(), ri2.clone());
1533            assert!(range.contains(&ri0), "start");
1534            assert!(range.contains(&ri1), "inside");
1535            assert!(!range.contains(&ri2), "end");
1536
1537            assert!(ri0 < ri1);
1538            assert!(ri1 < ri2);
1539        }
1540
1541        // Mixed
1542        {
1543            // Ord should prioritize namespace - author - key
1544
1545            let a0 = a[0].id();
1546            let a1 = a[1].id();
1547            let n0 = n[0].id();
1548            let n1 = n[1].id();
1549            let k0 = k[0];
1550            let k1 = k[1];
1551
1552            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1553            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1554            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1555            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1556        }
1557    }
1558
1559    #[tokio::test]
1560    async fn test_timestamps_memory() -> Result<()> {
1561        let store = store::Store::memory();
1562        test_timestamps(store).await?;
1563
1564        Ok(())
1565    }
1566
1567    #[tokio::test]
1568    #[cfg(feature = "fs-store")]
1569    async fn test_timestamps_fs() -> Result<()> {
1570        let dbfile = tempfile::NamedTempFile::new()?;
1571        let store = store::fs::Store::persistent(dbfile.path())?;
1572        test_timestamps(store).await?;
1573        Ok(())
1574    }
1575
1576    async fn test_timestamps(mut store: Store) -> Result<()> {
1577        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1578        let namespace = NamespaceSecret::new(&mut rng);
1579        let _replica = store.new_replica(namespace.clone())?;
1580        let author = store.new_author(&mut rng)?;
1581        store.close_replica(namespace.id());
1582        let mut replica = store.open_replica(&namespace.id())?;
1583
1584        let key = b"hello";
1585        let value = b"world";
1586        let entry = {
1587            let timestamp = 2;
1588            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1589            let record = Record::from_data(value, timestamp);
1590            Entry::new(id, record).sign(&namespace, &author)
1591        };
1592
1593        replica
1594            .insert_entry(entry.clone(), InsertOrigin::Local)
1595            .await
1596            .unwrap();
1597        store.close_replica(namespace.id());
1598        let res = store
1599            .get_exact(namespace.id(), author.id(), key, false)?
1600            .unwrap();
1601        assert_eq!(res, entry);
1602
1603        let entry2 = {
1604            let timestamp = 1;
1605            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1606            let record = Record::from_data(value, timestamp);
1607            Entry::new(id, record).sign(&namespace, &author)
1608        };
1609
1610        let mut replica = store.open_replica(&namespace.id())?;
1611        let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1612        store.close_replica(namespace.id());
1613        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1614        let res = store
1615            .get_exact(namespace.id(), author.id(), key, false)?
1616            .unwrap();
1617        assert_eq!(res, entry);
1618        store.flush()?;
1619        Ok(())
1620    }
1621
1622    #[tokio::test]
1623    async fn test_replica_sync_memory() -> Result<()> {
1624        let alice_store = store::Store::memory();
1625        let bob_store = store::Store::memory();
1626
1627        test_replica_sync(alice_store, bob_store).await?;
1628        Ok(())
1629    }
1630
1631    #[tokio::test]
1632    #[cfg(feature = "fs-store")]
1633    async fn test_replica_sync_fs() -> Result<()> {
1634        let alice_dbfile = tempfile::NamedTempFile::new()?;
1635        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1636        let bob_dbfile = tempfile::NamedTempFile::new()?;
1637        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1638        test_replica_sync(alice_store, bob_store).await?;
1639
1640        Ok(())
1641    }
1642
1643    async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1644        let alice_set = ["ape", "eel", "fox", "gnu"];
1645        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1646
1647        let mut rng = rand::rng();
1648        let author = Author::new(&mut rng);
1649        let myspace = NamespaceSecret::new(&mut rng);
1650        let mut alice = alice_store.new_replica(myspace.clone())?;
1651        for el in &alice_set {
1652            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1653        }
1654
1655        let mut bob = bob_store.new_replica(myspace.clone())?;
1656        for el in &bob_set {
1657            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1658        }
1659
1660        let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1661
1662        assert_eq!(alice_out.num_sent, 2);
1663        assert_eq!(bob_out.num_recv, 2);
1664        assert_eq!(alice_out.num_recv, 6);
1665        assert_eq!(bob_out.num_sent, 6);
1666
1667        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1668        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1669        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1670        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1671        alice_store.flush()?;
1672        bob_store.flush()?;
1673        Ok(())
1674    }
1675
1676    #[tokio::test]
1677    async fn test_replica_timestamp_sync_memory() -> Result<()> {
1678        let alice_store = store::Store::memory();
1679        let bob_store = store::Store::memory();
1680
1681        test_replica_timestamp_sync(alice_store, bob_store).await?;
1682        Ok(())
1683    }
1684
1685    #[tokio::test]
1686    #[cfg(feature = "fs-store")]
1687    async fn test_replica_timestamp_sync_fs() -> Result<()> {
1688        let alice_dbfile = tempfile::NamedTempFile::new()?;
1689        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1690        let bob_dbfile = tempfile::NamedTempFile::new()?;
1691        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1692        test_replica_timestamp_sync(alice_store, bob_store).await?;
1693
1694        Ok(())
1695    }
1696
1697    async fn test_replica_timestamp_sync(
1698        mut alice_store: Store,
1699        mut bob_store: Store,
1700    ) -> Result<()> {
1701        let mut rng = rand::rng();
1702        let author = Author::new(&mut rng);
1703        let namespace = NamespaceSecret::new(&mut rng);
1704        let mut alice = alice_store.new_replica(namespace.clone())?;
1705        let mut bob = bob_store.new_replica(namespace.clone())?;
1706
1707        let key = b"key";
1708        let alice_value = b"alice";
1709        let bob_value = b"bob";
1710        let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1711        // system time increased - sync should overwrite
1712        let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1713        sync(&mut alice, &mut bob).await?;
1714        assert_eq!(
1715            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1716            Some(bob_hash)
1717        );
1718        assert_eq!(
1719            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1720            Some(bob_hash)
1721        );
1722
1723        let mut alice = alice_store.new_replica(namespace.clone())?;
1724        let mut bob = bob_store.new_replica(namespace.clone())?;
1725
1726        let alice_value_2 = b"alice2";
1727        // system time increased - sync should overwrite
1728        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1729        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1730        sync(&mut alice, &mut bob).await?;
1731        assert_eq!(
1732            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1733            Some(alice_hash_2)
1734        );
1735        assert_eq!(
1736            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1737            Some(alice_hash_2)
1738        );
1739        alice_store.flush()?;
1740        bob_store.flush()?;
1741        Ok(())
1742    }
1743
1744    #[tokio::test]
1745    async fn test_future_timestamp() -> Result<()> {
1746        let mut rng = rand::rng();
1747        let mut store = store::Store::memory();
1748        let author = Author::new(&mut rng);
1749        let namespace = NamespaceSecret::new(&mut rng);
1750
1751        let mut replica = store.new_replica(namespace.clone())?;
1752        let key = b"hi";
1753        let t = system_time_now();
1754        let record = Record::from_data(b"1", t);
1755        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1756        replica
1757            .insert_entry(entry0.clone(), InsertOrigin::Local)
1758            .await?;
1759
1760        assert_eq!(
1761            get_entry(&mut store, namespace.id(), author.id(), key)?,
1762            entry0
1763        );
1764
1765        let mut replica = store.new_replica(namespace.clone())?;
1766        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1767        let record = Record::from_data(b"2", t);
1768        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1769        replica
1770            .insert_entry(entry1.clone(), InsertOrigin::Local)
1771            .await?;
1772        assert_eq!(
1773            get_entry(&mut store, namespace.id(), author.id(), key)?,
1774            entry1
1775        );
1776
1777        let mut replica = store.new_replica(namespace.clone())?;
1778        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1779        let record = Record::from_data(b"2", t);
1780        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1781        replica
1782            .insert_entry(entry2.clone(), InsertOrigin::Local)
1783            .await?;
1784        assert_eq!(
1785            get_entry(&mut store, namespace.id(), author.id(), key)?,
1786            entry2
1787        );
1788
1789        let mut replica = store.new_replica(namespace.clone())?;
1790        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1791        let record = Record::from_data(b"2", t);
1792        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1793        let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1794        assert!(matches!(
1795            res,
1796            Err(InsertError::Validation(
1797                ValidationFailure::TooFarInTheFuture
1798            ))
1799        ));
1800        assert_eq!(
1801            get_entry(&mut store, namespace.id(), author.id(), key)?,
1802            entry2
1803        );
1804        store.flush()?;
1805        Ok(())
1806    }
1807
1808    /// Regression: `MAX_TIMESTAMP_FUTURE_SHIFT` must be on the same unit scale as
1809    /// `system_time_now()` (Unix epoch **microseconds**). A millisecond-scaled constant
1810    /// only allows ~0.6s of future slack and would reject this insert.
1811    #[tokio::test]
1812    async fn test_future_timestamp_accepts_one_second_skew() -> Result<()> {
1813        let mut rng = rand::rng();
1814        let mut store = store::Store::memory();
1815        let author = Author::new(&mut rng);
1816        let namespace = NamespaceSecret::new(&mut rng);
1817        let mut replica = store.new_replica(namespace.clone())?;
1818        let key = b"skew";
1819        let skew_micros: u64 = Duration::from_secs(1).as_micros() as u64;
1820        let now = system_time_now();
1821        let record = Record::from_data(b"ahead", now + skew_micros);
1822        let entry = SignedEntry::from_parts(&namespace, &author, key, record);
1823        replica
1824            .insert_entry(entry.clone(), InsertOrigin::Local)
1825            .await?;
1826        assert_eq!(
1827            get_entry(&mut store, namespace.id(), author.id(), key)?,
1828            entry
1829        );
1830        store.flush()?;
1831        Ok(())
1832    }
1833
1834    #[tokio::test]
1835    async fn test_insert_empty() -> Result<()> {
1836        let mut store = store::Store::memory();
1837        let mut rng = rand::rng();
1838        let alice = Author::new(&mut rng);
1839        let myspace = NamespaceSecret::new(&mut rng);
1840        let mut replica = store.new_replica(myspace.clone())?;
1841        let hash = Hash::new(b"");
1842        let res = replica.insert(b"foo", &alice, hash, 0).await;
1843        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1844        store.flush()?;
1845        Ok(())
1846    }
1847
1848    #[tokio::test]
1849    async fn test_prefix_delete_memory() -> Result<()> {
1850        let store = store::Store::memory();
1851        test_prefix_delete(store).await?;
1852        Ok(())
1853    }
1854
1855    #[tokio::test]
1856    #[cfg(feature = "fs-store")]
1857    async fn test_prefix_delete_fs() -> Result<()> {
1858        let dbfile = tempfile::NamedTempFile::new()?;
1859        let store = store::fs::Store::persistent(dbfile.path())?;
1860        test_prefix_delete(store).await?;
1861        Ok(())
1862    }
1863
1864    async fn test_prefix_delete(mut store: Store) -> Result<()> {
1865        let mut rng = rand::rng();
1866        let alice = Author::new(&mut rng);
1867        let myspace = NamespaceSecret::new(&mut rng);
1868        let mut replica = store.new_replica(myspace.clone())?;
1869        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1870        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1871
1872        // sanity checks
1873        assert_eq!(
1874            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1875            Some(hash1)
1876        );
1877        assert_eq!(
1878            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1879            Some(hash2)
1880        );
1881
1882        // delete
1883        let mut replica = store.new_replica(myspace.clone())?;
1884        let deleted = replica.delete_prefix(b"foo", &alice).await?;
1885        assert_eq!(deleted, 2);
1886        assert_eq!(
1887            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1888            None
1889        );
1890        assert_eq!(
1891            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1892            None
1893        );
1894        assert_eq!(
1895            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1896            None
1897        );
1898        store.flush()?;
1899        Ok(())
1900    }
1901
1902    #[tokio::test]
1903    async fn test_replica_sync_delete_memory() -> Result<()> {
1904        let alice_store = store::Store::memory();
1905        let bob_store = store::Store::memory();
1906
1907        test_replica_sync_delete(alice_store, bob_store).await
1908    }
1909
1910    #[tokio::test]
1911    #[cfg(feature = "fs-store")]
1912    async fn test_replica_sync_delete_fs() -> Result<()> {
1913        let alice_dbfile = tempfile::NamedTempFile::new()?;
1914        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1915        let bob_dbfile = tempfile::NamedTempFile::new()?;
1916        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1917        test_replica_sync_delete(alice_store, bob_store).await
1918    }
1919
1920    async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1921        let alice_set = ["foot"];
1922        let bob_set = ["fool", "foo", "fog"];
1923
1924        let mut rng = rand::rng();
1925        let author = Author::new(&mut rng);
1926        let myspace = NamespaceSecret::new(&mut rng);
1927        let mut alice = alice_store.new_replica(myspace.clone())?;
1928        for el in &alice_set {
1929            alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1930        }
1931
1932        let mut bob = bob_store.new_replica(myspace.clone())?;
1933        for el in &bob_set {
1934            bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1935        }
1936
1937        sync(&mut alice, &mut bob).await?;
1938
1939        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1940        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1941        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1942        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1943
1944        let mut alice = alice_store.new_replica(myspace.clone())?;
1945        let mut bob = bob_store.new_replica(myspace.clone())?;
1946        alice.delete_prefix("foo", &author).await?;
1947        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1948            .await?;
1949        sync(&mut alice, &mut bob).await?;
1950        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1951        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1952        alice_store.flush()?;
1953        bob_store.flush()?;
1954        Ok(())
1955    }
1956
1957    #[tokio::test]
1958    async fn test_replica_remove_memory() -> Result<()> {
1959        let alice_store = store::Store::memory();
1960        test_replica_remove(alice_store).await
1961    }
1962
1963    #[tokio::test]
1964    #[cfg(feature = "fs-store")]
1965    async fn test_replica_remove_fs() -> Result<()> {
1966        let alice_dbfile = tempfile::NamedTempFile::new()?;
1967        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1968        test_replica_remove(alice_store).await
1969    }
1970
1971    async fn test_replica_remove(mut store: Store) -> Result<()> {
1972        let mut rng = rand::rng();
1973        let namespace = NamespaceSecret::new(&mut rng);
1974        let author = Author::new(&mut rng);
1975        let mut replica = store.new_replica(namespace.clone())?;
1976
1977        // insert entry
1978        let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1979        let res = store
1980            .get_many(namespace.id(), Query::all())?
1981            .collect::<Vec<_>>();
1982        assert_eq!(res.len(), 1);
1983
1984        // remove replica
1985        let res = store.remove_replica(&namespace.id());
1986        // may not remove replica while still open;
1987        assert!(res.is_err());
1988        store.close_replica(namespace.id());
1989        store.remove_replica(&namespace.id())?;
1990        let res = store
1991            .get_many(namespace.id(), Query::all())?
1992            .collect::<Vec<_>>();
1993        assert_eq!(res.len(), 0);
1994
1995        // may not reopen removed replica
1996        let res = store.load_replica_info(&namespace.id());
1997        assert!(matches!(res, Err(OpenError::NotFound)));
1998
1999        // may recreate replica
2000        let mut replica = store.new_replica(namespace.clone())?;
2001        replica.insert(b"foo", &author, hash, 3).await?;
2002        let res = store
2003            .get_many(namespace.id(), Query::all())?
2004            .collect::<Vec<_>>();
2005        assert_eq!(res.len(), 1);
2006        store.flush()?;
2007        Ok(())
2008    }
2009
2010    #[tokio::test]
2011    async fn test_replica_delete_edge_cases_memory() -> Result<()> {
2012        let store = store::Store::memory();
2013        test_replica_delete_edge_cases(store).await
2014    }
2015
2016    #[tokio::test]
2017    #[cfg(feature = "fs-store")]
2018    async fn test_replica_delete_edge_cases_fs() -> Result<()> {
2019        let dbfile = tempfile::NamedTempFile::new()?;
2020        let store = store::fs::Store::persistent(dbfile.path())?;
2021        test_replica_delete_edge_cases(store).await
2022    }
2023
2024    async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
2025        let mut rng = rand::rng();
2026        let author = Author::new(&mut rng);
2027        let namespace = NamespaceSecret::new(&mut rng);
2028
2029        let edgecases = [0u8, 1u8, 255u8];
2030        let prefixes = [0u8, 255u8];
2031        let hash = Hash::new(b"foo");
2032        let len = 3;
2033        for prefix in prefixes {
2034            let mut expected = vec![];
2035            let mut replica = store.new_replica(namespace.clone())?;
2036            for suffix in edgecases {
2037                let key = [prefix, suffix].to_vec();
2038                expected.push(key.clone());
2039                replica.insert(&key, &author, hash, len).await?;
2040            }
2041            assert_keys(&mut store, namespace.id(), expected);
2042            let mut replica = store.new_replica(namespace.clone())?;
2043            replica.delete_prefix([prefix], &author).await?;
2044            assert_keys(&mut store, namespace.id(), vec![]);
2045        }
2046
2047        let mut replica = store.new_replica(namespace.clone())?;
2048        let key = vec![1u8, 0u8];
2049        replica.insert(key, &author, hash, len).await?;
2050        let key = vec![1u8, 1u8];
2051        replica.insert(key, &author, hash, len).await?;
2052        let key = vec![1u8, 2u8];
2053        replica.insert(key, &author, hash, len).await?;
2054        let prefix = vec![1u8, 1u8];
2055        replica.delete_prefix(prefix, &author).await?;
2056        assert_keys(
2057            &mut store,
2058            namespace.id(),
2059            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2060        );
2061
2062        let mut replica = store.new_replica(namespace.clone())?;
2063        let key = vec![0u8, 255u8];
2064        replica.insert(key, &author, hash, len).await?;
2065        let key = vec![0u8, 0u8];
2066        replica.insert(key, &author, hash, len).await?;
2067        let prefix = vec![0u8];
2068        replica.delete_prefix(prefix, &author).await?;
2069        assert_keys(
2070            &mut store,
2071            namespace.id(),
2072            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2073        );
2074        store.flush()?;
2075        Ok(())
2076    }
2077
2078    #[tokio::test]
2079    async fn test_latest_iter_memory() -> Result<()> {
2080        let store = store::Store::memory();
2081        test_latest_iter(store).await
2082    }
2083
2084    #[tokio::test]
2085    #[cfg(feature = "fs-store")]
2086    async fn test_latest_iter_fs() -> Result<()> {
2087        let dbfile = tempfile::NamedTempFile::new()?;
2088        let store = store::fs::Store::persistent(dbfile.path())?;
2089        test_latest_iter(store).await
2090    }
2091
2092    async fn test_latest_iter(mut store: Store) -> Result<()> {
2093        let mut rng = rand::rng();
2094        let author0 = Author::new(&mut rng);
2095        let author1 = Author::new(&mut rng);
2096        let namespace = NamespaceSecret::new(&mut rng);
2097        let mut replica = store.new_replica(namespace.clone())?;
2098
2099        replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2100        let latest = store
2101            .get_latest_for_each_author(namespace.id())?
2102            .collect::<Result<Vec<_>>>()?;
2103        assert_eq!(latest.len(), 1);
2104        assert_eq!(latest[0].2, b"a0.1".to_vec());
2105
2106        let mut replica = store.new_replica(namespace.clone())?;
2107        replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2108        replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2109        let latest = store
2110            .get_latest_for_each_author(namespace.id())?
2111            .collect::<Result<Vec<_>>>()?;
2112        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2113        latest_keys.sort();
2114        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2115        store.flush()?;
2116        Ok(())
2117    }
2118
2119    #[tokio::test]
2120    async fn test_replica_byte_keys_memory() -> Result<()> {
2121        let store = store::Store::memory();
2122
2123        test_replica_byte_keys(store).await?;
2124        Ok(())
2125    }
2126
2127    #[tokio::test]
2128    #[cfg(feature = "fs-store")]
2129    async fn test_replica_byte_keys_fs() -> Result<()> {
2130        let dbfile = tempfile::NamedTempFile::new()?;
2131        let store = store::fs::Store::persistent(dbfile.path())?;
2132        test_replica_byte_keys(store).await?;
2133
2134        Ok(())
2135    }
2136
2137    async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2138        let mut rng = rand::rng();
2139        let author = Author::new(&mut rng);
2140        let namespace = NamespaceSecret::new(&mut rng);
2141
2142        let hash = Hash::new(b"foo");
2143        let len = 3;
2144
2145        let key = vec![1u8, 0u8];
2146        let mut replica = store.new_replica(namespace.clone())?;
2147        replica.insert(key, &author, hash, len).await?;
2148        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2149        let key = vec![1u8, 2u8];
2150        let mut replica = store.new_replica(namespace.clone())?;
2151        replica.insert(key, &author, hash, len).await?;
2152        assert_keys(
2153            &mut store,
2154            namespace.id(),
2155            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2156        );
2157
2158        let key = vec![0u8, 255u8];
2159        let mut replica = store.new_replica(namespace.clone())?;
2160        replica.insert(key, &author, hash, len).await?;
2161        assert_keys(
2162            &mut store,
2163            namespace.id(),
2164            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2165        );
2166        store.flush()?;
2167        Ok(())
2168    }
2169
2170    #[tokio::test]
2171    async fn test_replica_capability_memory() -> Result<()> {
2172        let store = store::Store::memory();
2173        test_replica_capability(store).await
2174    }
2175
2176    #[tokio::test]
2177    #[cfg(feature = "fs-store")]
2178    async fn test_replica_capability_fs() -> Result<()> {
2179        let dbfile = tempfile::NamedTempFile::new()?;
2180        let store = store::fs::Store::persistent(dbfile.path())?;
2181        test_replica_capability(store).await
2182    }
2183
2184    #[allow(clippy::redundant_pattern_matching)]
2185    async fn test_replica_capability(mut store: Store) -> Result<()> {
2186        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2187        let author = store.new_author(&mut rng)?;
2188        let namespace = NamespaceSecret::new(&mut rng);
2189
2190        // import read capability - insert must fail
2191        let capability = Capability::Read(namespace.id());
2192        store.import_namespace(capability)?;
2193        let mut replica = store.open_replica(&namespace.id())?;
2194        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2195        assert!(matches!(res, Err(InsertError::ReadOnly)));
2196
2197        // import write capability - insert must succeed
2198        let capability = Capability::Write(namespace.clone());
2199        store.import_namespace(capability)?;
2200        let mut replica = store.open_replica(&namespace.id())?;
2201        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2202        assert!(matches!(res, Ok(_)));
2203        store.close_replica(namespace.id());
2204        let mut replica = store.open_replica(&namespace.id())?;
2205        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2206        assert!(res.is_ok());
2207
2208        // import read capability again - insert must still succeed
2209        let capability = Capability::Read(namespace.id());
2210        store.import_namespace(capability)?;
2211        store.close_replica(namespace.id());
2212        let mut replica = store.open_replica(&namespace.id())?;
2213        let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2214        assert!(res.is_ok());
2215        store.flush()?;
2216        Ok(())
2217    }
2218
2219    #[tokio::test]
2220    async fn test_actor_capability_memory() -> Result<()> {
2221        let store = store::Store::memory();
2222        test_actor_capability(store).await
2223    }
2224
2225    #[tokio::test]
2226    #[cfg(feature = "fs-store")]
2227    async fn test_actor_capability_fs() -> Result<()> {
2228        let dbfile = tempfile::NamedTempFile::new()?;
2229        let store = store::fs::Store::persistent(dbfile.path())?;
2230        test_actor_capability(store).await
2231    }
2232
2233    async fn test_actor_capability(store: Store) -> Result<()> {
2234        // test with actor
2235        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2236        let author = Author::new(&mut rng);
2237        let handle = SyncHandle::spawn(store, None, "test".into());
2238        let author = handle.import_author(author).await?;
2239        let namespace = NamespaceSecret::new(&mut rng);
2240        let id = namespace.id();
2241
2242        // import read capability - insert must fail
2243        let capability = Capability::Read(namespace.id());
2244        handle.import_namespace(capability).await?;
2245        handle.open(namespace.id(), Default::default()).await?;
2246        let res = handle
2247            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2248            .await;
2249        assert!(res.is_err());
2250
2251        // import write capability - insert must succeed
2252        let capability = Capability::Write(namespace.clone());
2253        handle.import_namespace(capability).await?;
2254        let res = handle
2255            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2256            .await;
2257        assert!(res.is_ok());
2258
2259        // close and reopen - must still succeed
2260        handle.close(namespace.id()).await?;
2261        let res = handle
2262            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2263            .await;
2264        assert!(res.is_err());
2265        handle.open(namespace.id(), Default::default()).await?;
2266        let res = handle
2267            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2268            .await;
2269        assert!(res.is_ok());
2270        Ok(())
2271    }
2272
2273    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2274        let mut res = vec![];
2275        while let Ok(ev) = events.try_recv() {
2276            res.push(ev);
2277        }
2278        res
2279    }
2280
2281    /// This tests that no events are emitted for entries received during sync which are obsolete
2282    /// (too old) by the time they are actually inserted in the store.
2283    #[tokio::test]
2284    async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2285        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2286        let mut store1 = store::Store::memory();
2287        let mut store2 = store::Store::memory();
2288        let peer1 = [1u8; 32];
2289        let peer2 = [2u8; 32];
2290        let mut state1 = SyncOutcome::default();
2291        let mut state2 = SyncOutcome::default();
2292
2293        let author = Author::new(&mut rng);
2294        let namespace = NamespaceSecret::new(&mut rng);
2295        let mut replica1 = store1.new_replica(namespace.clone())?;
2296        let mut replica2 = store2.new_replica(namespace.clone())?;
2297
2298        let (events1_sender, events1) = async_channel::bounded(32);
2299        let (events2_sender, events2) = async_channel::bounded(32);
2300
2301        replica1.info.subscribe(events1_sender);
2302        replica2.info.subscribe(events2_sender);
2303
2304        replica1.hash_and_insert(b"foo", &author, b"init").await?;
2305
2306        let from1 = replica1.sync_initial_message()?;
2307        let from2 = replica2
2308            .sync_process_message(from1, peer1, &mut state2)
2309            .await
2310            .unwrap()
2311            .unwrap();
2312        let from1 = replica1
2313            .sync_process_message(from2, peer2, &mut state1)
2314            .await
2315            .unwrap()
2316            .unwrap();
2317        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2318        // sync is already running. this means the entry from replica1 will be rejected. we make
2319        // sure that no InsertRemote event is emitted for this entry.
2320        replica2.hash_and_insert(b"foo", &author, b"update").await?;
2321        let from2 = replica2
2322            .sync_process_message(from1, peer1, &mut state2)
2323            .await
2324            .unwrap();
2325        assert!(from2.is_none());
2326        let events1 = drain(events1);
2327        let events2 = drain(events2);
2328        assert_eq!(events1.len(), 1);
2329        assert_eq!(events2.len(), 1);
2330        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2331        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2332        assert_eq!(state1.num_sent, 1);
2333        assert_eq!(state1.num_recv, 0);
2334        assert_eq!(state2.num_sent, 0);
2335        assert_eq!(state2.num_recv, 1);
2336        store1.flush()?;
2337        store2.flush()?;
2338        Ok(())
2339    }
2340
2341    #[tokio::test]
2342    async fn test_replica_queries_mem() -> Result<()> {
2343        let store = store::Store::memory();
2344
2345        test_replica_queries(store).await?;
2346        Ok(())
2347    }
2348
2349    #[tokio::test]
2350    #[cfg(feature = "fs-store")]
2351    async fn test_replica_queries_fs() -> Result<()> {
2352        let dbfile = tempfile::NamedTempFile::new()?;
2353        let store = store::fs::Store::persistent(dbfile.path())?;
2354        test_replica_queries(store).await?;
2355
2356        Ok(())
2357    }
2358
2359    async fn test_replica_queries(mut store: Store) -> Result<()> {
2360        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2361        let namespace = NamespaceSecret::new(&mut rng);
2362        let namespace_id = namespace.id();
2363
2364        let a1 = store.new_author(&mut rng)?;
2365        let a2 = store.new_author(&mut rng)?;
2366        let a3 = store.new_author(&mut rng)?;
2367        println!(
2368            "a1 {} a2 {} a3 {}",
2369            a1.id().fmt_short(),
2370            a2.id().fmt_short(),
2371            a3.id().fmt_short()
2372        );
2373
2374        let mut replica = store.new_replica(namespace.clone())?;
2375        replica.hash_and_insert("hi/world", &a2, "a2").await?;
2376        replica.hash_and_insert("hi/world", &a1, "a1").await?;
2377        replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2378        replica.hash_and_insert("hi", &a3, "a3").await?;
2379
2380        struct QueryTester<'a> {
2381            store: &'a mut Store,
2382            namespace: NamespaceId,
2383        }
2384        impl QueryTester<'_> {
2385            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2386                let query = query.into();
2387                let actual = self
2388                    .store
2389                    .get_many(self.namespace, query.clone())
2390                    .unwrap()
2391                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2392                    .collect::<Result<Vec<_>>>()
2393                    .unwrap();
2394                let expected = expected
2395                    .into_iter()
2396                    .map(|(key, author)| (key.to_string(), author.id()))
2397                    .collect::<Vec<_>>();
2398                assert_eq!(actual, expected, "query: {query:#?}")
2399            }
2400        }
2401
2402        let mut qt = QueryTester {
2403            store: &mut store,
2404            namespace: namespace_id,
2405        };
2406
2407        qt.assert(
2408            Query::all(),
2409            vec![
2410                ("hi/world", &a1),
2411                ("hi/moon", &a2),
2412                ("hi/world", &a2),
2413                ("hi", &a3),
2414            ],
2415        );
2416
2417        qt.assert(
2418            Query::single_latest_per_key(),
2419            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2420        );
2421
2422        qt.assert(
2423            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2424            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2425        );
2426
2427        qt.assert(
2428            Query::single_latest_per_key().key_prefix("hi/"),
2429            vec![("hi/moon", &a2), ("hi/world", &a1)],
2430        );
2431
2432        qt.assert(
2433            Query::single_latest_per_key()
2434                .key_prefix("hi/")
2435                .sort_direction(SortDirection::Desc),
2436            vec![("hi/world", &a1), ("hi/moon", &a2)],
2437        );
2438
2439        qt.assert(
2440            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2441            vec![
2442                ("hi", &a3),
2443                ("hi/moon", &a2),
2444                ("hi/world", &a1),
2445                ("hi/world", &a2),
2446            ],
2447        );
2448
2449        qt.assert(
2450            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2451            vec![
2452                ("hi/world", &a2),
2453                ("hi/world", &a1),
2454                ("hi/moon", &a2),
2455                ("hi", &a3),
2456            ],
2457        );
2458
2459        qt.assert(
2460            Query::all().key_prefix("hi/"),
2461            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2462        );
2463
2464        qt.assert(
2465            Query::all().key_prefix("hi/").offset(1).limit(1),
2466            vec![("hi/moon", &a2)],
2467        );
2468
2469        qt.assert(
2470            Query::all()
2471                .key_prefix("hi/")
2472                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2473            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2474        );
2475
2476        qt.assert(
2477            Query::all()
2478                .key_prefix("hi/")
2479                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2480                .offset(1)
2481                .limit(1),
2482            vec![("hi/world", &a1)],
2483        );
2484
2485        qt.assert(
2486            Query::all()
2487                .key_prefix("hi/")
2488                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2489            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2490        );
2491
2492        qt.assert(
2493            Query::all()
2494                .key_prefix("hi/")
2495                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2496            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2497        );
2498
2499        qt.assert(
2500            Query::all()
2501                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2502                .limit(2)
2503                .offset(1),
2504            vec![("hi/moon", &a2), ("hi/world", &a1)],
2505        );
2506
2507        let mut replica = store.new_replica(namespace)?;
2508        replica.delete_prefix("hi/world", &a2).await?;
2509        let mut qt = QueryTester {
2510            store: &mut store,
2511            namespace: namespace_id,
2512        };
2513
2514        qt.assert(
2515            Query::all(),
2516            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2517        );
2518
2519        qt.assert(
2520            Query::all().include_empty(),
2521            vec![
2522                ("hi/world", &a1),
2523                ("hi/moon", &a2),
2524                ("hi/world", &a2),
2525                ("hi", &a3),
2526            ],
2527        );
2528        store.flush()?;
2529        Ok(())
2530    }
2531
2532    #[test]
2533    fn test_dl_policies_mem() -> Result<()> {
2534        let mut store = store::Store::memory();
2535        test_dl_policies(&mut store)
2536    }
2537
2538    #[test]
2539    #[cfg(feature = "fs-store")]
2540    fn test_dl_policies_fs() -> Result<()> {
2541        let dbfile = tempfile::NamedTempFile::new()?;
2542        let mut store = store::fs::Store::persistent(dbfile.path())?;
2543        test_dl_policies(&mut store)
2544    }
2545
2546    fn test_dl_policies(store: &mut Store) -> Result<()> {
2547        let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2548        let namespace = NamespaceSecret::new(&mut rng);
2549        let id = namespace.id();
2550
2551        let filter = store::FilterKind::Exact("foo".into());
2552        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2553        store
2554            .set_download_policy(&id, policy.clone())
2555            .expect_err("document dos not exist");
2556
2557        // now create the document
2558        store.new_replica(namespace)?;
2559
2560        store.set_download_policy(&id, policy.clone())?;
2561        let retrieved_policy = store.get_download_policy(&id)?;
2562        assert_eq!(retrieved_policy, policy);
2563        store.flush()?;
2564        Ok(())
2565    }
2566
2567    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2568        expected.sort();
2569        assert_eq!(expected, get_keys_sorted(store, namespace));
2570    }
2571
2572    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2573        let mut res = store
2574            .get_many(namespace, Query::all())
2575            .unwrap()
2576            .map(|e| e.map(|e| e.key().to_vec()))
2577            .collect::<Result<Vec<_>>>()
2578            .unwrap();
2579        res.sort();
2580        res
2581    }
2582
2583    fn get_entry(
2584        store: &mut Store,
2585        namespace: NamespaceId,
2586        author: AuthorId,
2587        key: &[u8],
2588    ) -> anyhow::Result<SignedEntry> {
2589        let entry = store
2590            .get_exact(namespace, author, key, true)?
2591            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2592        Ok(entry)
2593    }
2594
2595    fn get_content_hash(
2596        store: &mut Store,
2597        namespace: NamespaceId,
2598        author: AuthorId,
2599        key: &[u8],
2600    ) -> anyhow::Result<Option<Hash>> {
2601        let hash = store
2602            .get_exact(namespace, author, key, false)?
2603            .map(|e| e.content_hash());
2604        Ok(hash)
2605    }
2606
2607    async fn sync<'a>(
2608        alice: &'a mut Replica<'a>,
2609        bob: &'a mut Replica<'a>,
2610    ) -> Result<(SyncOutcome, SyncOutcome)> {
2611        let alice_peer_id = [1u8; 32];
2612        let bob_peer_id = [2u8; 32];
2613        let mut alice_state = SyncOutcome::default();
2614        let mut bob_state = SyncOutcome::default();
2615        // Sync alice - bob
2616        let mut next_to_bob = Some(alice.sync_initial_message()?);
2617        let mut rounds = 0;
2618        while let Some(msg) = next_to_bob.take() {
2619            assert!(rounds < 100, "too many rounds");
2620            rounds += 1;
2621            println!("round {rounds}");
2622            if let Some(msg) = bob
2623                .sync_process_message(msg, alice_peer_id, &mut bob_state)
2624                .await?
2625            {
2626                next_to_bob = alice
2627                    .sync_process_message(msg, bob_peer_id, &mut alice_state)
2628                    .await?
2629            }
2630        }
2631        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2632        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2633        Ok((alice_state, bob_state))
2634    }
2635
2636    fn check_entries(
2637        store: &mut Store,
2638        namespace: &NamespaceId,
2639        author: &Author,
2640        set: &[&str],
2641    ) -> Result<()> {
2642        for el in set {
2643            store.get_exact(*namespace, author.id(), el, false)?;
2644        }
2645        Ok(())
2646    }
2647
2648    /// Snapshot of the `SignedEntry` postcard wire format used by doc sync.
2649    #[test]
2650    fn test_signed_entry_postcard_snapshot() {
2651        let author = Author::from_bytes(&[0xa1; 32]);
2652        let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2653        let record = Record::new(Hash::EMPTY, 0, 1_700_000_000_000_000u64);
2654        let id = RecordIdentifier::new(namespace.id(), author.id(), b"wire-format-test");
2655        let signed = SignedEntry::from_entry(Entry::new(id, record), &namespace, &author);
2656
2657        let bytes = postcard::to_stdvec(&signed).unwrap();
2658        assert_eq!(
2659            hex::encode(&bytes),
2660            "4b523f1b6d9b00a4779fc9f8f105a9e36f062ceb7d511b632905782042ad30acb6dd07bfced4ecd5f3aa58321e8ace63f48f988ed8461bfdcd8b0e902187a10e228ddc6998329b7faa64875fe80da36406ea8d87e3e57bb048323e9cb66c0b343b60c4e709fb978b878e37d0c362edfc06c8cdc774c8b29d94e48eaa06cca60f5055154f42065ea5a1bea05463826be2684eb92df92c100027aabaae57ca554207bc7cbcb5636375fa1d82434d466724d92377f53b980695dd49d26d0ce12205a5776972652d666f726d61742d7465737400af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f32628080f9c0c1c48203",
2661        );
2662
2663        let decoded: SignedEntry = postcard::from_bytes(&bytes).unwrap();
2664        assert_eq!(decoded, signed);
2665    }
2666
2667    /// Snapshot of the `Author` postcard format.
2668    #[test]
2669    fn test_author_postcard_snapshot() {
2670        let author = Author::from_bytes(&[0xa1; 32]);
2671        let bytes = postcard::to_stdvec(&author).unwrap();
2672        assert_eq!(
2673            hex::encode(&bytes),
2674            "20a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1",
2675        );
2676    }
2677
2678    /// Snapshot of the `NamespaceSecret` postcard format.
2679    #[test]
2680    fn test_namespace_secret_postcard_snapshot() {
2681        let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2682        let bytes = postcard::to_stdvec(&namespace).unwrap();
2683        assert_eq!(
2684            hex::encode(&bytes),
2685            "20b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2",
2686        );
2687    }
2688}