iroh_docs/
sync.rs

1//! API for iroh-docs replicas
2
3// Names and concepts are roughly based on Willows design at the moment:
4//
5// https://hackmd.io/DTtck8QOQm6tZaQBBtTf7w
6//
7// This is going to change!
8
9use std::{
10    cmp::Ordering,
11    fmt::Debug,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14    time::{Duration, SystemTime},
15};
16
17use bytes::{Bytes, BytesMut};
18use ed25519_dalek::{Signature, SignatureError};
19use iroh_blobs::Hash;
20use serde::{Deserialize, Serialize};
21
22pub use crate::heads::AuthorHeads;
23use crate::{
24    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
25    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
26    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
27};
28
29/// Protocol message for the set reconciliation protocol.
30///
31/// Can be serialized to bytes with [serde] to transfer between peers.
32pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
33
34/// Byte representation of a `PeerId` from `iroh-net`.
35// TODO: PeerId is in iroh-net which iroh-docs doesn't depend on. Add iroh-base crate with `PeerId`.
36pub type PeerIdBytes = [u8; 32];
37
38/// Max time in the future from our wall clock time that we accept entries for.
39/// Value is 10 minutes.
40pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
41
42/// Callback that may be set on a replica to determine the availability status for a content hash.
43pub type ContentStatusCallback = Arc<dyn Fn(Hash) -> ContentStatus + Send + Sync + 'static>;
44
45/// Event emitted by sync when entries are added.
46#[derive(Debug, Clone)]
47pub enum Event {
48    /// A local entry has been added.
49    LocalInsert {
50        /// Document in which the entry was inserted.
51        namespace: NamespaceId,
52        /// Inserted entry.
53        entry: SignedEntry,
54    },
55    /// A remote entry has been added.
56    RemoteInsert {
57        /// Document in which the entry was inserted.
58        namespace: NamespaceId,
59        /// Inserted entry.
60        entry: SignedEntry,
61        /// Peer that provided the inserted entry.
62        from: PeerIdBytes,
63        /// Whether download policies require the content to be downloaded.
64        should_download: bool,
65        /// [`ContentStatus`] for this entry in the remote's replica.
66        remote_content_status: ContentStatus,
67    },
68}
69
70/// Whether an entry was inserted locally or by a remote peer.
71#[derive(Debug, Clone)]
72pub enum InsertOrigin {
73    /// The entry was inserted locally.
74    Local,
75    /// The entry was received from the remote node identified by [`PeerIdBytes`].
76    Sync {
77        /// The peer from which we received this entry.
78        from: PeerIdBytes,
79        /// Whether the peer claims to have the content blob for this entry.
80        remote_content_status: ContentStatus,
81    },
82}
83
84/// Whether the content status is available on a node.
85#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
86pub enum ContentStatus {
87    /// The content is completely available.
88    Complete,
89    /// The content is partially available.
90    Incomplete,
91    /// The content is missing.
92    Missing,
93}
94
95/// Outcome of a sync operation.
96#[derive(Debug, Clone, Default)]
97pub struct SyncOutcome {
98    /// Timestamp of the latest entry for each author in the set we received.
99    pub heads_received: AuthorHeads,
100    /// Number of entries we received.
101    pub num_recv: usize,
102    /// Number of entries we sent.
103    pub num_sent: usize,
104}
105
106fn get_as_ptr<T>(value: &T) -> Option<usize> {
107    use std::mem;
108    if mem::size_of::<T>() == std::mem::size_of::<usize>()
109        && mem::align_of::<T>() == mem::align_of::<usize>()
110    {
111        // Safe only if size and alignment requirements are met
112        unsafe { Some(mem::transmute_copy(value)) }
113    } else {
114        None
115    }
116}
117
118fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
119    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
120}
121
122#[derive(Debug, Default)]
123struct Subscribers(Vec<async_channel::Sender<Event>>);
124impl Subscribers {
125    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
126        self.0.push(sender)
127    }
128    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
129        self.0.retain(|s| !same_channel(s, sender));
130    }
131    pub fn send(&mut self, event: Event) {
132        self.0
133            .retain(|sender| sender.send_blocking(event.clone()).is_ok())
134    }
135    pub fn len(&self) -> usize {
136        self.0.len()
137    }
138    pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
139        if !self.0.is_empty() {
140            self.send(f())
141        }
142    }
143}
144
145/// Kind of capability of the namespace.
146#[derive(
147    Debug,
148    Clone,
149    Copy,
150    Serialize,
151    Deserialize,
152    num_enum::IntoPrimitive,
153    num_enum::TryFromPrimitive,
154    strum::Display,
155)]
156#[repr(u8)]
157#[strum(serialize_all = "snake_case")]
158pub enum CapabilityKind {
159    /// A writable replica.
160    Write = 1,
161    /// A readable replica.
162    Read = 2,
163}
164
165/// The capability of the namespace.
166#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
167pub enum Capability {
168    /// Write access to the namespace.
169    Write(NamespaceSecret),
170    /// Read only access to the namespace.
171    Read(NamespaceId),
172}
173
174impl Capability {
175    /// Get the [`NamespaceId`] for this [`Capability`].
176    pub fn id(&self) -> NamespaceId {
177        match self {
178            Capability::Write(secret) => secret.id(),
179            Capability::Read(id) => *id,
180        }
181    }
182
183    /// Get the [`NamespaceSecret`] of this [`Capability`].
184    /// Will fail if the [`Capability`] is read only.
185    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
186        match self {
187            Capability::Write(secret) => Ok(secret),
188            Capability::Read(_) => Err(ReadOnly),
189        }
190    }
191
192    /// Get the kind of capability.
193    pub fn kind(&self) -> CapabilityKind {
194        match self {
195            Capability::Write(_) => CapabilityKind::Write,
196            Capability::Read(_) => CapabilityKind::Read,
197        }
198    }
199
200    /// Get the raw representation of this namespace capability.
201    pub fn raw(&self) -> (u8, [u8; 32]) {
202        let capability_repr: u8 = self.kind().into();
203        let bytes = match self {
204            Capability::Write(secret) => secret.to_bytes(),
205            Capability::Read(id) => id.to_bytes(),
206        };
207        (capability_repr, bytes)
208    }
209
210    /// Create a [`Capability`] from its raw representation.
211    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
212        let kind: CapabilityKind = kind.try_into()?;
213        let capability = match kind {
214            CapabilityKind::Write => {
215                let secret = NamespaceSecret::from_bytes(bytes);
216                Capability::Write(secret)
217            }
218            CapabilityKind::Read => {
219                let id = NamespaceId::from(bytes);
220                Capability::Read(id)
221            }
222        };
223        Ok(capability)
224    }
225
226    /// Merge this capability with another capability.
227    ///
228    /// Will return an error if `other` is not a capability for the same namespace.
229    ///
230    /// Returns `true` if the capability was changed, `false` otherwise.
231    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
232        if other.id() != self.id() {
233            return Err(CapabilityError::NamespaceMismatch);
234        }
235
236        // the only capability upgrade is from read-only (self) to writable (other)
237        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
238            let _ = std::mem::replace(self, other);
239            Ok(true)
240        } else {
241            Ok(false)
242        }
243    }
244}
245
246/// Errors for capability operations
247#[derive(Debug, thiserror::Error)]
248pub enum CapabilityError {
249    /// Namespaces are not the same
250    #[error("Namespaces are not the same")]
251    NamespaceMismatch,
252}
253
254/// In memory information about an open replica.
255#[derive(derive_more::Debug)]
256pub struct ReplicaInfo {
257    pub(crate) capability: Capability,
258    subscribers: Subscribers,
259    #[debug("ContentStatusCallback")]
260    content_status_cb: Option<ContentStatusCallback>,
261    closed: bool,
262}
263
264impl ReplicaInfo {
265    /// Create a new replica.
266    pub fn new(capability: Capability) -> Self {
267        Self {
268            capability,
269            subscribers: Default::default(),
270            // on_insert_sender: RwLock::new(None),
271            content_status_cb: None,
272            closed: false,
273        }
274    }
275
276    /// Subscribe to insert events.
277    ///
278    /// When subscribing to a replica, you must ensure that the corresponding [`async_channel::Receiver`] is
279    /// received from in a loop. If not receiving, local and remote inserts will hang waiting for
280    /// the receiver to be received from.
281    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
282        self.subscribers.subscribe(sender)
283    }
284
285    /// Explicitly unsubscribe a sender.
286    ///
287    /// Simply dropping the receiver is fine too. If you cloned a single sender to subscribe to
288    /// multiple replicas, you can use this method to explicitly unsubscribe the sender from
289    /// this replica without having to drop the receiver.
290    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
291        self.subscribers.unsubscribe(sender)
292    }
293
294    /// Get the number of current event subscribers.
295    pub fn subscribers_count(&self) -> usize {
296        self.subscribers.len()
297    }
298
299    /// Set the content status callback.
300    ///
301    /// Only one callback can be active at a time. If a previous callback was registered, this
302    /// will return `false`.
303    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
304        if self.content_status_cb.is_some() {
305            false
306        } else {
307            self.content_status_cb = Some(cb);
308            true
309        }
310    }
311
312    fn ensure_open(&self) -> Result<(), InsertError> {
313        if self.closed() {
314            Err(InsertError::Closed)
315        } else {
316            Ok(())
317        }
318    }
319
320    /// Returns true if the replica is closed.
321    ///
322    /// If a replica is closed, no further operations can be performed. A replica cannot be closed
323    /// manually, it must be closed via [`store::Store::close_replica`] or
324    /// [`store::Store::remove_replica`]
325    pub fn closed(&self) -> bool {
326        self.closed
327    }
328
329    /// Merge a capability.
330    ///
331    /// The capability must refer to the the same namespace, otherwise an error will be returned.
332    ///
333    /// This will upgrade the replica's capability when passing a `Capability::Write`.
334    /// It is a no-op if `capability` is a Capability::Read`.
335    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
336        self.capability.merge(capability)
337    }
338}
339
340/// Local representation of a mutable, synchronizable key-value store.
341#[derive(derive_more::Debug)]
342pub struct Replica<'a, I = Box<ReplicaInfo>> {
343    pub(crate) store: StoreInstance<'a>,
344    pub(crate) info: I,
345}
346
347impl<'a, I> Replica<'a, I>
348where
349    I: Deref<Target = ReplicaInfo> + DerefMut,
350{
351    /// Create a new replica.
352    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
353        Replica { info, store }
354    }
355
356    /// Insert a new record at the given key.
357    ///
358    /// The entry will by signed by the provided `author`.
359    /// The `len` must be the byte length of the data identified by `hash`.
360    ///
361    /// Returns the number of entries removed as a consequence of this insertion,
362    /// or an error either if the entry failed to validate or if a store operation failed.
363    pub fn insert(
364        &mut self,
365        key: impl AsRef<[u8]>,
366        author: &Author,
367        hash: Hash,
368        len: u64,
369    ) -> Result<usize, InsertError> {
370        if len == 0 || hash == Hash::EMPTY {
371            return Err(InsertError::EntryIsEmpty);
372        }
373        self.info.ensure_open()?;
374        let id = RecordIdentifier::new(self.id(), author.id(), key);
375        let record = Record::new_current(hash, len);
376        let entry = Entry::new(id, record);
377        let secret = self.secret_key()?;
378        let signed_entry = entry.sign(secret, author);
379        self.insert_entry(signed_entry, InsertOrigin::Local)
380    }
381
382    /// Delete entries that match the given `author` and key `prefix`.
383    ///
384    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
385    /// entries whose key starts with or is equal to the given `prefix`.
386    ///
387    /// Returns the number of entries deleted.
388    pub fn delete_prefix(
389        &mut self,
390        prefix: impl AsRef<[u8]>,
391        author: &Author,
392    ) -> Result<usize, InsertError> {
393        self.info.ensure_open()?;
394        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
395        let entry = Entry::new_empty(id);
396        let signed_entry = entry.sign(self.secret_key()?, author);
397        self.insert_entry(signed_entry, InsertOrigin::Local)
398    }
399
400    /// Insert an entry into this replica which was received from a remote peer.
401    ///
402    /// This will verify both the namespace and author signatures of the entry, emit an `on_insert`
403    /// event, and insert the entry into the replica store.
404    ///
405    /// Returns the number of entries removed as a consequence of this insertion,
406    /// or an error if the entry failed to validate or if a store operation failed.
407    pub fn insert_remote_entry(
408        &mut self,
409        entry: SignedEntry,
410        received_from: PeerIdBytes,
411        content_status: ContentStatus,
412    ) -> Result<usize, InsertError> {
413        self.info.ensure_open()?;
414        entry.validate_empty()?;
415        let origin = InsertOrigin::Sync {
416            from: received_from,
417            remote_content_status: content_status,
418        };
419        self.insert_entry(entry, origin)
420    }
421
422    /// Insert a signed entry into the database.
423    ///
424    /// Returns the number of entries removed as a consequence of this insertion.
425    fn insert_entry(
426        &mut self,
427        entry: SignedEntry,
428        origin: InsertOrigin,
429    ) -> Result<usize, InsertError> {
430        let namespace = self.id();
431
432        let store = &self.store;
433        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
434
435        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
436        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
437
438        let removed_count = match outcome {
439            InsertOutcome::Inserted { removed } => removed,
440            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
441        };
442
443        let insert_event = match origin {
444            InsertOrigin::Local => Event::LocalInsert { namespace, entry },
445            InsertOrigin::Sync {
446                from,
447                remote_content_status,
448            } => {
449                let download_policy = self
450                    .store
451                    .get_download_policy(&self.id())
452                    .unwrap_or_default();
453                let should_download = download_policy.matches(entry.entry());
454                Event::RemoteInsert {
455                    namespace,
456                    entry,
457                    from,
458                    should_download,
459                    remote_content_status,
460                }
461            }
462        };
463
464        self.info.subscribers.send(insert_event);
465
466        Ok(removed_count)
467    }
468
469    /// Hashes the given data and inserts it.
470    ///
471    /// This does not store the content, just the record of it.
472    /// Returns the calculated hash.
473    pub fn hash_and_insert(
474        &mut self,
475        key: impl AsRef<[u8]>,
476        author: &Author,
477        data: impl AsRef<[u8]>,
478    ) -> Result<Hash, InsertError> {
479        self.info.ensure_open()?;
480        let len = data.as_ref().len() as u64;
481        let hash = Hash::new(data);
482        self.insert(key, author, hash, len)?;
483        Ok(hash)
484    }
485
486    /// Get the identifier for an entry in this replica.
487    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
488        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
489    }
490
491    /// Create the initial message for the set reconciliation flow with a remote peer.
492    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
493        self.info.ensure_open().map_err(anyhow::Error::from)?;
494        self.store.initial_message()
495    }
496
497    /// Process a set reconciliation message from a remote peer.
498    ///
499    /// Returns the next message to be sent to the peer, if any.
500    pub fn sync_process_message(
501        &mut self,
502        message: crate::ranger::Message<SignedEntry>,
503        from_peer: PeerIdBytes,
504        state: &mut SyncOutcome,
505    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
506        self.info.ensure_open()?;
507        let my_namespace = self.id();
508        let now = system_time_now();
509
510        // update state with incoming data.
511        state.num_recv += message.value_count();
512        for (entry, _content_status) in message.values() {
513            state
514                .heads_received
515                .insert(entry.author(), entry.timestamp());
516        }
517
518        // let subscribers = std::rc::Rc::new(&mut self.subscribers);
519        // l
520        let cb = self.info.content_status_cb.clone();
521        let download_policy = self
522            .store
523            .get_download_policy(&my_namespace)
524            .unwrap_or_default();
525        let reply = self.store.process_message(
526            &Default::default(),
527            message,
528            // validate callback: validate incoming entries, and send to on_insert channel
529            |store, entry, content_status| {
530                let origin = InsertOrigin::Sync {
531                    from: from_peer,
532                    remote_content_status: content_status,
533                };
534                validate_entry(now, store, my_namespace, entry, &origin).is_ok()
535            },
536            // on_insert callback: is called when an entry was actually inserted in the store
537            |_store, entry, content_status| {
538                // We use `send_with` to only clone the entry if we have active subscriptions.
539                self.info.subscribers.send_with(|| {
540                    let should_download = download_policy.matches(entry.entry());
541                    Event::RemoteInsert {
542                        from: from_peer,
543                        namespace: my_namespace,
544                        entry: entry.clone(),
545                        should_download,
546                        remote_content_status: content_status,
547                    }
548                })
549            },
550            // content_status callback: get content status for outgoing entries
551            |_store, entry| {
552                if let Some(cb) = cb.as_ref() {
553                    cb(entry.content_hash())
554                } else {
555                    ContentStatus::Missing
556                }
557            },
558        )?;
559
560        // update state with outgoing data.
561        if let Some(ref reply) = reply {
562            state.num_sent += reply.value_count();
563        }
564
565        Ok(reply)
566    }
567
568    /// Get the namespace identifier for this [`Replica`].
569    pub fn id(&self) -> NamespaceId {
570        self.info.capability.id()
571    }
572
573    /// Get the [`Capability`] of this [`Replica`].
574    pub fn capability(&self) -> &Capability {
575        &self.info.capability
576    }
577
578    /// Get the byte representation of the [`NamespaceSecret`] key for this replica. Will fail if
579    /// the replica is read only
580    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
581        self.info.capability.secret_key()
582    }
583}
584
585/// Error that occurs trying to access the [`NamespaceSecret`] of a read-only [`Capability`].
586#[derive(Debug, thiserror::Error)]
587#[error("Replica allows read access only.")]
588pub struct ReadOnly;
589
590/// Validate a [`SignedEntry`] if it's fit to be inserted.
591///
592/// This validates that
593/// * the entry's author and namespace signatures are correct
594/// * the entry's namespace matches the current replica
595/// * the entry's timestamp is not more than 10 minutes in the future of our system time
596/// * the entry is newer than an existing entry for the same key and author, if such exists.
597fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
598    now: u64,
599    store: &S,
600    expected_namespace: NamespaceId,
601    entry: &SignedEntry,
602    origin: &InsertOrigin,
603) -> Result<(), ValidationFailure> {
604    // Verify the namespace
605    if entry.namespace() != expected_namespace {
606        return Err(ValidationFailure::InvalidNamespace);
607    }
608
609    // Verify signature for non-local entries.
610    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
611        return Err(ValidationFailure::BadSignature);
612    }
613
614    // Verify that the timestamp of the entry is not too far in the future.
615    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
616        return Err(ValidationFailure::TooFarInTheFuture);
617    }
618    Ok(())
619}
620
621/// Error emitted when inserting entries into a [`Replica`] failed
622#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
623pub enum InsertError {
624    /// Storage error
625    #[error("storage error")]
626    Store(anyhow::Error),
627    /// Validation failure
628    #[error("validation failure")]
629    Validation(#[from] ValidationFailure),
630    /// A newer entry exists for either this entry's key or a prefix of the key.
631    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
632    NewerEntryExists,
633    /// Attempted to insert an empty entry.
634    #[error("Attempted to insert an empty entry")]
635    EntryIsEmpty,
636    /// Replica is read only.
637    #[error("Attempted to insert to read only replica")]
638    #[from(ReadOnly)]
639    ReadOnly,
640    /// The replica is closed, no operations may be performed.
641    #[error("replica is closed")]
642    Closed,
643}
644
645/// Reason why entry validation failed
646#[derive(thiserror::Error, Debug)]
647pub enum ValidationFailure {
648    /// Entry namespace does not match the current replica.
649    #[error("Entry namespace does not match the current replica")]
650    InvalidNamespace,
651    /// Entry signature is invalid.
652    #[error("Entry signature is invalid")]
653    BadSignature,
654    /// Entry timestamp is too far in the future.
655    #[error("Entry timestamp is too far in the future.")]
656    TooFarInTheFuture,
657    /// Entry has length 0 but not the empty hash, or the empty hash but not length 0.
658    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
659    InvalidEmptyEntry,
660}
661
662/// A signed entry.
663#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
664pub struct SignedEntry {
665    signature: EntrySignature,
666    entry: Entry,
667}
668
669impl From<SignedEntry> for Entry {
670    fn from(value: SignedEntry) -> Self {
671        value.entry
672    }
673}
674
675impl PartialOrd for SignedEntry {
676    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
677        Some(self.cmp(other))
678    }
679}
680
681impl Ord for SignedEntry {
682    fn cmp(&self, other: &Self) -> Ordering {
683        self.entry.cmp(&other.entry)
684    }
685}
686
687impl PartialOrd for Entry {
688    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
689        Some(self.cmp(other))
690    }
691}
692
693impl Ord for Entry {
694    fn cmp(&self, other: &Self) -> Ordering {
695        self.id
696            .cmp(&other.id)
697            .then_with(|| self.record.cmp(&other.record))
698    }
699}
700
701impl SignedEntry {
702    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
703        SignedEntry { signature, entry }
704    }
705
706    /// Create a new signed entry by signing an entry with the `namespace` and `author`.
707    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
708        let signature = EntrySignature::from_entry(&entry, namespace, author);
709        SignedEntry { signature, entry }
710    }
711
712    /// Create a new signed entries from its parts.
713    pub fn from_parts(
714        namespace: &NamespaceSecret,
715        author: &Author,
716        key: impl AsRef<[u8]>,
717        record: Record,
718    ) -> Self {
719        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
720        let entry = Entry::new(id, record);
721        Self::from_entry(entry, namespace, author)
722    }
723
724    /// Verify the signatures on this entry.
725    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
726        self.signature.verify(
727            &self.entry,
728            &self.entry.namespace().public_key(store)?,
729            &self.entry.author().public_key(store)?,
730        )
731    }
732
733    /// Get the signature.
734    pub fn signature(&self) -> &EntrySignature {
735        &self.signature
736    }
737
738    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
739    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
740        self.entry().validate_empty()
741    }
742
743    /// Get the [`Entry`].
744    pub fn entry(&self) -> &Entry {
745        &self.entry
746    }
747
748    /// Get the content [`struct@Hash`] of the entry.
749    pub fn content_hash(&self) -> Hash {
750        self.entry().content_hash()
751    }
752
753    /// Get the content length of the entry.
754    pub fn content_len(&self) -> u64 {
755        self.entry().content_len()
756    }
757
758    /// Get the author bytes of this entry.
759    pub fn author_bytes(&self) -> AuthorId {
760        self.entry().id().author()
761    }
762
763    /// Get the key of the entry.
764    pub fn key(&self) -> &[u8] {
765        self.entry().id().key()
766    }
767
768    /// Get the timestamp of the entry.
769    pub fn timestamp(&self) -> u64 {
770        self.entry().timestamp()
771    }
772}
773
774impl RangeEntry for SignedEntry {
775    type Key = RecordIdentifier;
776    type Value = Record;
777
778    fn key(&self) -> &Self::Key {
779        &self.entry.id
780    }
781
782    fn value(&self) -> &Self::Value {
783        &self.entry.record
784    }
785
786    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
787        let mut hasher = blake3::Hasher::new();
788        hasher.update(self.namespace().as_ref());
789        hasher.update(self.author_bytes().as_ref());
790        hasher.update(self.key());
791        hasher.update(&self.timestamp().to_be_bytes());
792        hasher.update(self.content_hash().as_bytes());
793        Fingerprint(hasher.finalize().into())
794    }
795}
796
797/// Signature over an entry.
798#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
799pub struct EntrySignature {
800    author_signature: Signature,
801    namespace_signature: Signature,
802}
803
804impl Debug for EntrySignature {
805    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
806        f.debug_struct("EntrySignature")
807            .field(
808                "namespace_signature",
809                &hex::encode(self.namespace_signature.to_bytes()),
810            )
811            .field(
812                "author_signature",
813                &hex::encode(self.author_signature.to_bytes()),
814            )
815            .finish()
816    }
817}
818
819impl EntrySignature {
820    /// Create a new signature by signing an entry with the `namespace` and `author`.
821    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
822        // TODO: this should probably include a namespace prefix
823        // namespace in the cryptographic sense.
824        let bytes = entry.to_vec();
825        let namespace_signature = namespace.sign(&bytes);
826        let author_signature = author.sign(&bytes);
827
828        EntrySignature {
829            author_signature,
830            namespace_signature,
831        }
832    }
833
834    /// Verify that this signature was created by signing the `entry` with the
835    /// secret keys of the specified `author` and `namespace`.
836    pub fn verify(
837        &self,
838        entry: &Entry,
839        namespace: &NamespacePublicKey,
840        author: &AuthorPublicKey,
841    ) -> Result<(), SignatureError> {
842        let bytes = entry.to_vec();
843        namespace.verify(&bytes, &self.namespace_signature)?;
844        author.verify(&bytes, &self.author_signature)?;
845
846        Ok(())
847    }
848
849    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
850        let namespace_signature = Signature::from_bytes(namespace_sig);
851        let author_signature = Signature::from_bytes(author_sig);
852
853        EntrySignature {
854            author_signature,
855            namespace_signature,
856        }
857    }
858
859    pub(crate) fn author(&self) -> &Signature {
860        &self.author_signature
861    }
862
863    pub(crate) fn namespace(&self) -> &Signature {
864        &self.namespace_signature
865    }
866}
867
868/// A single entry in a [`Replica`]
869///
870/// An entry is identified by a key, its [`Author`], and the [`Replica`]'s
871/// [`NamespaceSecret`]. Its value is the [32-byte BLAKE3 hash](iroh_blobs::Hash)
872/// of the entry's content data, the size of this content data, and a timestamp.
873#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
874pub struct Entry {
875    id: RecordIdentifier,
876    record: Record,
877}
878
879impl Entry {
880    /// Create a new entry
881    pub fn new(id: RecordIdentifier, record: Record) -> Self {
882        Entry { id, record }
883    }
884
885    /// Create a new empty entry with the current timestamp.
886    pub fn new_empty(id: RecordIdentifier) -> Self {
887        Entry {
888            id,
889            record: Record::empty_current(),
890        }
891    }
892
893    /// Validate that the entry has the empty hash if the length is 0, or a non-zero length.
894    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
895        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
896            (true, true) => Ok(()),
897            (false, false) => Ok(()),
898            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
899            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
900        }
901    }
902
903    /// Get the [`RecordIdentifier`] for this entry.
904    pub fn id(&self) -> &RecordIdentifier {
905        &self.id
906    }
907
908    /// Get the [`NamespaceId`] of this entry.
909    pub fn namespace(&self) -> NamespaceId {
910        self.id.namespace()
911    }
912
913    /// Get the [`AuthorId`] of this entry.
914    pub fn author(&self) -> AuthorId {
915        self.id.author()
916    }
917
918    /// Get the key of this entry.
919    pub fn key(&self) -> &[u8] {
920        self.id.key()
921    }
922
923    /// Get the [`Record`] contained in this entry.
924    pub fn record(&self) -> &Record {
925        &self.record
926    }
927
928    /// Get the content hash of the record.
929    pub fn content_hash(&self) -> Hash {
930        self.record.hash
931    }
932
933    /// Get the content length of the record.
934    pub fn content_len(&self) -> u64 {
935        self.record.len
936    }
937
938    /// Get the timestamp of the record.
939    pub fn timestamp(&self) -> u64 {
940        self.record.timestamp
941    }
942
943    /// Serialize this entry into its canonical byte representation used for signing.
944    pub fn encode(&self, out: &mut Vec<u8>) {
945        self.id.encode(out);
946        self.record.encode(out);
947    }
948
949    /// Serialize this entry into a new vector with its canonical byte representation.
950    pub fn to_vec(&self) -> Vec<u8> {
951        let mut out = Vec::new();
952        self.encode(&mut out);
953        out
954    }
955
956    /// Sign this entry with a [`NamespaceSecret`] and [`Author`].
957    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
958        SignedEntry::from_entry(self, namespace, author)
959    }
960}
961
962const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
963const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
964const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
965
966/// The identifier of a record.
967#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
968pub struct RecordIdentifier(Bytes);
969
970impl Default for RecordIdentifier {
971    fn default() -> Self {
972        Self::new(NamespaceId::default(), AuthorId::default(), b"")
973    }
974}
975
976impl Debug for RecordIdentifier {
977    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
978        f.debug_struct("RecordIdentifier")
979            .field("namespace", &self.namespace())
980            .field("author", &self.author())
981            .field("key", &std::string::String::from_utf8_lossy(self.key()))
982            .finish()
983    }
984}
985
986impl RangeKey for RecordIdentifier {
987    #[cfg(test)]
988    fn is_prefix_of(&self, other: &Self) -> bool {
989        other.as_ref().starts_with(self.as_ref())
990    }
991}
992
993fn system_time_now() -> u64 {
994    SystemTime::now()
995        .duration_since(SystemTime::UNIX_EPOCH)
996        .expect("time drift")
997        .as_micros() as u64
998}
999
1000impl RecordIdentifier {
1001    /// Create a new [`RecordIdentifier`].
1002    pub fn new(
1003        namespace: impl Into<NamespaceId>,
1004        author: impl Into<AuthorId>,
1005        key: impl AsRef<[u8]>,
1006    ) -> Self {
1007        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1008        bytes.extend_from_slice(namespace.into().as_bytes());
1009        bytes.extend_from_slice(author.into().as_bytes());
1010        bytes.extend_from_slice(key.as_ref());
1011        Self(bytes.freeze())
1012    }
1013
1014    /// Serialize this [`RecordIdentifier`] into a mutable byte array.
1015    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1016        out.extend_from_slice(&self.0);
1017    }
1018
1019    /// Get this [`RecordIdentifier`] as [Bytes].
1020    pub fn as_bytes(&self) -> Bytes {
1021        self.0.clone()
1022    }
1023
1024    /// Get this [`RecordIdentifier`] as a tuple of byte slices.
1025    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1026        (
1027            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1028            self.0[AUTHOR_BYTES].try_into().unwrap(),
1029            &self.0[KEY_BYTES],
1030        )
1031    }
1032
1033    /// Get this [`RecordIdentifier`] as a tuple of bytes.
1034    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1035        (
1036            self.0[NAMESPACE_BYTES].try_into().unwrap(),
1037            self.0[AUTHOR_BYTES].try_into().unwrap(),
1038            self.0.slice(KEY_BYTES),
1039        )
1040    }
1041
1042    /// Get the key of this record.
1043    pub fn key(&self) -> &[u8] {
1044        &self.0[KEY_BYTES]
1045    }
1046
1047    /// Get the key of this record as [`Bytes`].
1048    pub fn key_bytes(&self) -> Bytes {
1049        self.0.slice(KEY_BYTES)
1050    }
1051
1052    /// Get the [`NamespaceId`] of this record as byte array.
1053    pub fn namespace(&self) -> NamespaceId {
1054        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1055        value.into()
1056    }
1057
1058    /// Get the [`AuthorId`] of this record as byte array.
1059    pub fn author(&self) -> AuthorId {
1060        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1061        value.into()
1062    }
1063}
1064
1065impl AsRef<[u8]> for RecordIdentifier {
1066    fn as_ref(&self) -> &[u8] {
1067        &self.0
1068    }
1069}
1070
1071impl Deref for SignedEntry {
1072    type Target = Entry;
1073    fn deref(&self) -> &Self::Target {
1074        &self.entry
1075    }
1076}
1077
1078impl Deref for Entry {
1079    type Target = Record;
1080    fn deref(&self) -> &Self::Target {
1081        &self.record
1082    }
1083}
1084
1085/// The data part of an entry in a [`Replica`].
1086#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1087pub struct Record {
1088    /// Length of the data referenced by `hash`.
1089    len: u64,
1090    /// Hash of the content data.
1091    hash: Hash,
1092    /// Record creation timestamp. Counted as micros since the Unix epoch.
1093    timestamp: u64,
1094}
1095
1096impl RangeValue for Record {}
1097
1098/// Ordering for entry values.
1099///
1100/// Compares first the timestamp, then the content hash.
1101impl Ord for Record {
1102    fn cmp(&self, other: &Self) -> Ordering {
1103        self.timestamp
1104            .cmp(&other.timestamp)
1105            .then_with(|| self.hash.cmp(&other.hash))
1106    }
1107}
1108
1109impl PartialOrd for Record {
1110    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1111        Some(self.cmp(other))
1112    }
1113}
1114
1115impl Record {
1116    /// Create a new record.
1117    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1118        debug_assert!(
1119            len != 0 || hash == Hash::EMPTY,
1120            "if `len` is 0 then `hash` must be the hash of the empty byte range"
1121        );
1122        Record {
1123            hash,
1124            len,
1125            timestamp,
1126        }
1127    }
1128
1129    /// Create a tombstone record (empty content)
1130    pub fn empty(timestamp: u64) -> Self {
1131        Self::new(Hash::EMPTY, 0, timestamp)
1132    }
1133
1134    /// Create a tombstone record with the timestamp set to now.
1135    pub fn empty_current() -> Self {
1136        Self::new_current(Hash::EMPTY, 0)
1137    }
1138
1139    /// Return `true` if the entry is empty.
1140    pub fn is_empty(&self) -> bool {
1141        self.hash == Hash::EMPTY
1142    }
1143
1144    /// Create a new [`Record`] with the timestamp set to now.
1145    pub fn new_current(hash: Hash, len: u64) -> Self {
1146        let timestamp = system_time_now();
1147        Self::new(hash, len, timestamp)
1148    }
1149
1150    /// Get the length of the data addressed by this record's content hash.
1151    pub fn content_len(&self) -> u64 {
1152        self.len
1153    }
1154
1155    /// Get the [`struct@Hash`] of the content data of this record.
1156    pub fn content_hash(&self) -> Hash {
1157        self.hash
1158    }
1159
1160    /// Get the timestamp of this record.
1161    pub fn timestamp(&self) -> u64 {
1162        self.timestamp
1163    }
1164
1165    #[cfg(test)]
1166    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1167        let len = data.as_ref().len() as u64;
1168        let hash = Hash::new(data);
1169        Self::new_current(hash, len)
1170    }
1171
1172    #[cfg(test)]
1173    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1174        let len = data.as_ref().len() as u64;
1175        let hash = Hash::new(data);
1176        Self::new(hash, len, timestamp)
1177    }
1178
1179    /// Serialize this record into a mutable byte array.
1180    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1181        out.extend_from_slice(&self.len.to_be_bytes());
1182        out.extend_from_slice(self.hash.as_ref());
1183        out.extend_from_slice(&self.timestamp.to_be_bytes())
1184    }
1185}
1186
1187#[cfg(test)]
1188mod tests {
1189    use std::collections::HashSet;
1190
1191    use anyhow::Result;
1192    use rand_core::SeedableRng;
1193
1194    use super::*;
1195    use crate::{
1196        actor::SyncHandle,
1197        ranger::{Range, Store as _},
1198        store::{OpenError, Query, SortBy, SortDirection, Store},
1199    };
1200
1201    #[test]
1202    fn test_basics_memory() -> Result<()> {
1203        let store = store::Store::memory();
1204        test_basics(store)?;
1205
1206        Ok(())
1207    }
1208
1209    #[test]
1210    fn test_basics_fs() -> Result<()> {
1211        let dbfile = tempfile::NamedTempFile::new()?;
1212        let store = store::fs::Store::persistent(dbfile.path())?;
1213        test_basics(store)?;
1214        Ok(())
1215    }
1216
1217    fn test_basics(mut store: Store) -> Result<()> {
1218        let mut rng = rand::thread_rng();
1219        let alice = Author::new(&mut rng);
1220        let bob = Author::new(&mut rng);
1221        let myspace = NamespaceSecret::new(&mut rng);
1222
1223        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1224        let record = Record::current_from_data(b"this is my cool data");
1225        let entry = Entry::new(record_id, record);
1226        let signed_entry = entry.sign(&myspace, &alice);
1227        signed_entry.verify(&()).expect("failed to verify");
1228
1229        let mut my_replica = store.new_replica(myspace.clone())?;
1230        for i in 0..10 {
1231            my_replica.hash_and_insert(
1232                format!("/{i}"),
1233                &alice,
1234                format!("{i}: hello from alice"),
1235            )?;
1236        }
1237
1238        for i in 0..10 {
1239            let res = store
1240                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1241                .unwrap();
1242            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1243            assert_eq!(res.entry().record().content_len(), len);
1244            res.verify(&())?;
1245        }
1246
1247        // Test multiple records for the same key
1248        let mut my_replica = store.new_replica(myspace.clone())?;
1249        my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
1250        let _entry = store
1251            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1252            .unwrap();
1253        // Second
1254        let mut my_replica = store.new_replica(myspace.clone())?;
1255        my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
1256        let _entry = store
1257            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1258            .unwrap();
1259
1260        // Get All by author
1261        let entries: Vec<_> = store
1262            .get_many(myspace.id(), Query::author(alice.id()))?
1263            .collect::<Result<_>>()?;
1264        assert_eq!(entries.len(), 11);
1265
1266        // Get All by author
1267        let entries: Vec<_> = store
1268            .get_many(myspace.id(), Query::author(bob.id()))?
1269            .collect::<Result<_>>()?;
1270        assert_eq!(entries.len(), 0);
1271
1272        // Get All by key
1273        let entries: Vec<_> = store
1274            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1275            .collect::<Result<_>>()?;
1276        assert_eq!(entries.len(), 1);
1277
1278        // Get All
1279        let entries: Vec<_> = store
1280            .get_many(myspace.id(), Query::all())?
1281            .collect::<Result<_>>()?;
1282        assert_eq!(entries.len(), 11);
1283
1284        // insert record from different author
1285        let mut my_replica = store.new_replica(myspace.clone())?;
1286        let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
1287
1288        // Get All by author
1289        let entries: Vec<_> = store
1290            .get_many(myspace.id(), Query::author(alice.id()))?
1291            .collect::<Result<_>>()?;
1292        assert_eq!(entries.len(), 11);
1293
1294        let entries: Vec<_> = store
1295            .get_many(myspace.id(), Query::author(bob.id()))?
1296            .collect::<Result<_>>()?;
1297        assert_eq!(entries.len(), 1);
1298
1299        // Get All by key
1300        let entries: Vec<_> = store
1301            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1302            .collect::<Result<_>>()?;
1303        assert_eq!(entries.len(), 2);
1304
1305        // Get all by prefix
1306        let entries: Vec<_> = store
1307            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1308            .collect::<Result<_>>()?;
1309        assert_eq!(entries.len(), 2);
1310
1311        // Get All by author and prefix
1312        let entries: Vec<_> = store
1313            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1314            .collect::<Result<_>>()?;
1315        assert_eq!(entries.len(), 1);
1316
1317        let entries: Vec<_> = store
1318            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1319            .collect::<Result<_>>()?;
1320        assert_eq!(entries.len(), 1);
1321
1322        // Get All
1323        let entries: Vec<_> = store
1324            .get_many(myspace.id(), Query::all())?
1325            .collect::<Result<_>>()?;
1326        assert_eq!(entries.len(), 12);
1327
1328        // Get Range of all should return all latest
1329        let mut my_replica = store.new_replica(myspace.clone())?;
1330        let entries_second: Vec<_> = my_replica
1331            .store
1332            .get_range(Range::new(
1333                RecordIdentifier::default(),
1334                RecordIdentifier::default(),
1335            ))?
1336            .collect::<Result<_, _>>()?;
1337
1338        assert_eq!(entries_second.len(), 12);
1339        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1340
1341        test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1342        store.flush()?;
1343        Ok(())
1344    }
1345
1346    /// Test that [`Store::register_useful_peer`] behaves like a LRUCache of size
1347    /// [`super::store::PEERS_PER_DOC_CACHE_SIZE`].
1348    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1349        /// Helper to verify the store returns the expected peers for the namespace.
1350        #[track_caller]
1351        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1352            assert_eq!(
1353                expected_peers,
1354                &store
1355                    .get_sync_peers(&namespace)
1356                    .unwrap()
1357                    .unwrap()
1358                    .collect::<Vec<_>>(),
1359                "sync peers differ"
1360            );
1361        }
1362
1363        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1364        // expected peers: newest peers are to the front, oldest to the back
1365        let mut expected_peers = Vec::with_capacity(count);
1366        for i in 0..count as u8 {
1367            let peer = [i; 32];
1368            expected_peers.insert(0, peer);
1369            store.register_useful_peer(namespace, peer)?;
1370        }
1371        verify_peers(store, namespace, &expected_peers);
1372
1373        // one more peer should evict the last peer
1374        expected_peers.pop();
1375        let newer_peer = [count as u8; 32];
1376        expected_peers.insert(0, newer_peer);
1377        store.register_useful_peer(namespace, newer_peer)?;
1378        verify_peers(store, namespace, &expected_peers);
1379
1380        // move one existing peer up
1381        let refreshed_peer = expected_peers.remove(2);
1382        expected_peers.insert(0, refreshed_peer);
1383        store.register_useful_peer(namespace, refreshed_peer)?;
1384        verify_peers(store, namespace, &expected_peers);
1385        Ok(())
1386    }
1387
1388    #[test]
1389    fn test_content_hashes_iterator_memory() -> Result<()> {
1390        let store = store::Store::memory();
1391        test_content_hashes_iterator(store)
1392    }
1393
1394    #[test]
1395    fn test_content_hashes_iterator_fs() -> Result<()> {
1396        let dbfile = tempfile::NamedTempFile::new()?;
1397        let store = store::fs::Store::persistent(dbfile.path())?;
1398        test_content_hashes_iterator(store)
1399    }
1400
1401    fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1402        let mut rng = rand::thread_rng();
1403        let mut expected = HashSet::new();
1404        let n_replicas = 3;
1405        let n_entries = 4;
1406        for i in 0..n_replicas {
1407            let namespace = NamespaceSecret::new(&mut rng);
1408            let author = store.new_author(&mut rng)?;
1409            let mut replica = store.new_replica(namespace)?;
1410            for j in 0..n_entries {
1411                let key = format!("{j}");
1412                let data = format!("{i}:{j}");
1413                let hash = replica.hash_and_insert(key, &author, data)?;
1414                expected.insert(hash);
1415            }
1416        }
1417        assert_eq!(expected.len(), n_replicas * n_entries);
1418        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1419        assert_eq!(actual, expected);
1420        Ok(())
1421    }
1422
1423    #[test]
1424    fn test_multikey() {
1425        let mut rng = rand::thread_rng();
1426
1427        let k = ["a", "c", "z"];
1428
1429        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1430        n.sort_by_key(|n| n.id());
1431
1432        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1433        a.sort_by_key(|a| a.id());
1434
1435        // Just key
1436        {
1437            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1438            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1439            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1440
1441            let range = Range::new(ri0.clone(), ri2.clone());
1442            assert!(range.contains(&ri0), "start");
1443            assert!(range.contains(&ri1), "inside");
1444            assert!(!range.contains(&ri2), "end");
1445
1446            assert!(ri0 < ri1);
1447            assert!(ri1 < ri2);
1448        }
1449
1450        // Just namespace
1451        {
1452            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1453            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1454            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1455
1456            let range = Range::new(ri0.clone(), ri2.clone());
1457            assert!(range.contains(&ri0), "start");
1458            assert!(range.contains(&ri1), "inside");
1459            assert!(!range.contains(&ri2), "end");
1460
1461            assert!(ri0 < ri1);
1462            assert!(ri1 < ri2);
1463        }
1464
1465        // Just author
1466        {
1467            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1468            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1469            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1470
1471            let range = Range::new(ri0.clone(), ri2.clone());
1472            assert!(range.contains(&ri0), "start");
1473            assert!(range.contains(&ri1), "inside");
1474            assert!(!range.contains(&ri2), "end");
1475
1476            assert!(ri0 < ri1);
1477            assert!(ri1 < ri2);
1478        }
1479
1480        // Just key and namespace
1481        {
1482            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1483            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1484            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1485
1486            let range = Range::new(ri0.clone(), ri2.clone());
1487            assert!(range.contains(&ri0), "start");
1488            assert!(range.contains(&ri1), "inside");
1489            assert!(!range.contains(&ri2), "end");
1490
1491            assert!(ri0 < ri1);
1492            assert!(ri1 < ri2);
1493        }
1494
1495        // Mixed
1496        {
1497            // Ord should prioritize namespace - author - key
1498
1499            let a0 = a[0].id();
1500            let a1 = a[1].id();
1501            let n0 = n[0].id();
1502            let n1 = n[1].id();
1503            let k0 = k[0];
1504            let k1 = k[1];
1505
1506            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1507            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1508            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1509            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1510        }
1511    }
1512
1513    #[test]
1514    fn test_timestamps_memory() -> Result<()> {
1515        let store = store::Store::memory();
1516        test_timestamps(store)?;
1517
1518        Ok(())
1519    }
1520
1521    #[test]
1522    fn test_timestamps_fs() -> Result<()> {
1523        let dbfile = tempfile::NamedTempFile::new()?;
1524        let store = store::fs::Store::persistent(dbfile.path())?;
1525        test_timestamps(store)?;
1526        Ok(())
1527    }
1528
1529    fn test_timestamps(mut store: Store) -> Result<()> {
1530        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1531        let namespace = NamespaceSecret::new(&mut rng);
1532        let _replica = store.new_replica(namespace.clone())?;
1533        let author = store.new_author(&mut rng)?;
1534        store.close_replica(namespace.id());
1535        let mut replica = store.open_replica(&namespace.id())?;
1536
1537        let key = b"hello";
1538        let value = b"world";
1539        let entry = {
1540            let timestamp = 2;
1541            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1542            let record = Record::from_data(value, timestamp);
1543            Entry::new(id, record).sign(&namespace, &author)
1544        };
1545
1546        replica
1547            .insert_entry(entry.clone(), InsertOrigin::Local)
1548            .unwrap();
1549        store.close_replica(namespace.id());
1550        let res = store
1551            .get_exact(namespace.id(), author.id(), key, false)?
1552            .unwrap();
1553        assert_eq!(res, entry);
1554
1555        let entry2 = {
1556            let timestamp = 1;
1557            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1558            let record = Record::from_data(value, timestamp);
1559            Entry::new(id, record).sign(&namespace, &author)
1560        };
1561
1562        let mut replica = store.open_replica(&namespace.id())?;
1563        let res = replica.insert_entry(entry2, InsertOrigin::Local);
1564        store.close_replica(namespace.id());
1565        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1566        let res = store
1567            .get_exact(namespace.id(), author.id(), key, false)?
1568            .unwrap();
1569        assert_eq!(res, entry);
1570        store.flush()?;
1571        Ok(())
1572    }
1573
1574    #[test]
1575    fn test_replica_sync_memory() -> Result<()> {
1576        let alice_store = store::Store::memory();
1577        let bob_store = store::Store::memory();
1578
1579        test_replica_sync(alice_store, bob_store)?;
1580        Ok(())
1581    }
1582
1583    #[test]
1584    fn test_replica_sync_fs() -> Result<()> {
1585        let alice_dbfile = tempfile::NamedTempFile::new()?;
1586        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1587        let bob_dbfile = tempfile::NamedTempFile::new()?;
1588        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1589        test_replica_sync(alice_store, bob_store)?;
1590
1591        Ok(())
1592    }
1593
1594    fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1595        let alice_set = ["ape", "eel", "fox", "gnu"];
1596        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1597
1598        let mut rng = rand::thread_rng();
1599        let author = Author::new(&mut rng);
1600        let myspace = NamespaceSecret::new(&mut rng);
1601        let mut alice = alice_store.new_replica(myspace.clone())?;
1602        for el in &alice_set {
1603            alice.hash_and_insert(el, &author, el.as_bytes())?;
1604        }
1605
1606        let mut bob = bob_store.new_replica(myspace.clone())?;
1607        for el in &bob_set {
1608            bob.hash_and_insert(el, &author, el.as_bytes())?;
1609        }
1610
1611        let (alice_out, bob_out) = sync(&mut alice, &mut bob)?;
1612
1613        assert_eq!(alice_out.num_sent, 2);
1614        assert_eq!(bob_out.num_recv, 2);
1615        assert_eq!(alice_out.num_recv, 6);
1616        assert_eq!(bob_out.num_sent, 6);
1617
1618        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1619        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1620        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1621        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1622        alice_store.flush()?;
1623        bob_store.flush()?;
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn test_replica_timestamp_sync_memory() -> Result<()> {
1629        let alice_store = store::Store::memory();
1630        let bob_store = store::Store::memory();
1631
1632        test_replica_timestamp_sync(alice_store, bob_store)?;
1633        Ok(())
1634    }
1635
1636    #[test]
1637    fn test_replica_timestamp_sync_fs() -> Result<()> {
1638        let alice_dbfile = tempfile::NamedTempFile::new()?;
1639        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1640        let bob_dbfile = tempfile::NamedTempFile::new()?;
1641        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1642        test_replica_timestamp_sync(alice_store, bob_store)?;
1643
1644        Ok(())
1645    }
1646
1647    fn test_replica_timestamp_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1648        let mut rng = rand::thread_rng();
1649        let author = Author::new(&mut rng);
1650        let namespace = NamespaceSecret::new(&mut rng);
1651        let mut alice = alice_store.new_replica(namespace.clone())?;
1652        let mut bob = bob_store.new_replica(namespace.clone())?;
1653
1654        let key = b"key";
1655        let alice_value = b"alice";
1656        let bob_value = b"bob";
1657        let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
1658        // system time increased - sync should overwrite
1659        let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
1660        sync(&mut alice, &mut bob)?;
1661        assert_eq!(
1662            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1663            Some(bob_hash)
1664        );
1665        assert_eq!(
1666            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1667            Some(bob_hash)
1668        );
1669
1670        let mut alice = alice_store.new_replica(namespace.clone())?;
1671        let mut bob = bob_store.new_replica(namespace.clone())?;
1672
1673        let alice_value_2 = b"alice2";
1674        // system time increased - sync should overwrite
1675        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
1676        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
1677        sync(&mut alice, &mut bob)?;
1678        assert_eq!(
1679            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1680            Some(alice_hash_2)
1681        );
1682        assert_eq!(
1683            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1684            Some(alice_hash_2)
1685        );
1686        alice_store.flush()?;
1687        bob_store.flush()?;
1688        Ok(())
1689    }
1690
1691    #[test]
1692    fn test_future_timestamp() -> Result<()> {
1693        let mut rng = rand::thread_rng();
1694        let mut store = store::Store::memory();
1695        let author = Author::new(&mut rng);
1696        let namespace = NamespaceSecret::new(&mut rng);
1697
1698        let mut replica = store.new_replica(namespace.clone())?;
1699        let key = b"hi";
1700        let t = system_time_now();
1701        let record = Record::from_data(b"1", t);
1702        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1703        replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
1704
1705        assert_eq!(
1706            get_entry(&mut store, namespace.id(), author.id(), key)?,
1707            entry0
1708        );
1709
1710        let mut replica = store.new_replica(namespace.clone())?;
1711        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1712        let record = Record::from_data(b"2", t);
1713        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1714        replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
1715        assert_eq!(
1716            get_entry(&mut store, namespace.id(), author.id(), key)?,
1717            entry1
1718        );
1719
1720        let mut replica = store.new_replica(namespace.clone())?;
1721        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1722        let record = Record::from_data(b"2", t);
1723        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1724        replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
1725        assert_eq!(
1726            get_entry(&mut store, namespace.id(), author.id(), key)?,
1727            entry2
1728        );
1729
1730        let mut replica = store.new_replica(namespace.clone())?;
1731        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1732        let record = Record::from_data(b"2", t);
1733        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1734        let res = replica.insert_entry(entry3, InsertOrigin::Local);
1735        assert!(matches!(
1736            res,
1737            Err(InsertError::Validation(
1738                ValidationFailure::TooFarInTheFuture
1739            ))
1740        ));
1741        assert_eq!(
1742            get_entry(&mut store, namespace.id(), author.id(), key)?,
1743            entry2
1744        );
1745        store.flush()?;
1746        Ok(())
1747    }
1748
1749    #[test]
1750    fn test_insert_empty() -> Result<()> {
1751        let mut store = store::Store::memory();
1752        let mut rng = rand::thread_rng();
1753        let alice = Author::new(&mut rng);
1754        let myspace = NamespaceSecret::new(&mut rng);
1755        let mut replica = store.new_replica(myspace.clone())?;
1756        let hash = Hash::new(b"");
1757        let res = replica.insert(b"foo", &alice, hash, 0);
1758        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1759        store.flush()?;
1760        Ok(())
1761    }
1762
1763    #[test]
1764    fn test_prefix_delete_memory() -> Result<()> {
1765        let store = store::Store::memory();
1766        test_prefix_delete(store)?;
1767        Ok(())
1768    }
1769
1770    #[test]
1771    fn test_prefix_delete_fs() -> Result<()> {
1772        let dbfile = tempfile::NamedTempFile::new()?;
1773        let store = store::fs::Store::persistent(dbfile.path())?;
1774        test_prefix_delete(store)?;
1775        Ok(())
1776    }
1777
1778    fn test_prefix_delete(mut store: Store) -> Result<()> {
1779        let mut rng = rand::thread_rng();
1780        let alice = Author::new(&mut rng);
1781        let myspace = NamespaceSecret::new(&mut rng);
1782        let mut replica = store.new_replica(myspace.clone())?;
1783        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
1784        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
1785
1786        // sanity checks
1787        assert_eq!(
1788            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1789            Some(hash1)
1790        );
1791        assert_eq!(
1792            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1793            Some(hash2)
1794        );
1795
1796        // delete
1797        let mut replica = store.new_replica(myspace.clone())?;
1798        let deleted = replica.delete_prefix(b"foo", &alice)?;
1799        assert_eq!(deleted, 2);
1800        assert_eq!(
1801            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1802            None
1803        );
1804        assert_eq!(
1805            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1806            None
1807        );
1808        assert_eq!(
1809            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1810            None
1811        );
1812        store.flush()?;
1813        Ok(())
1814    }
1815
1816    #[test]
1817    fn test_replica_sync_delete_memory() -> Result<()> {
1818        let alice_store = store::Store::memory();
1819        let bob_store = store::Store::memory();
1820
1821        test_replica_sync_delete(alice_store, bob_store)
1822    }
1823
1824    #[test]
1825    fn test_replica_sync_delete_fs() -> Result<()> {
1826        let alice_dbfile = tempfile::NamedTempFile::new()?;
1827        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1828        let bob_dbfile = tempfile::NamedTempFile::new()?;
1829        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1830        test_replica_sync_delete(alice_store, bob_store)
1831    }
1832
1833    fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1834        let alice_set = ["foot"];
1835        let bob_set = ["fool", "foo", "fog"];
1836
1837        let mut rng = rand::thread_rng();
1838        let author = Author::new(&mut rng);
1839        let myspace = NamespaceSecret::new(&mut rng);
1840        let mut alice = alice_store.new_replica(myspace.clone())?;
1841        for el in &alice_set {
1842            alice.hash_and_insert(el, &author, el.as_bytes())?;
1843        }
1844
1845        let mut bob = bob_store.new_replica(myspace.clone())?;
1846        for el in &bob_set {
1847            bob.hash_and_insert(el, &author, el.as_bytes())?;
1848        }
1849
1850        sync(&mut alice, &mut bob)?;
1851
1852        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1853        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1854        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1855        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1856
1857        let mut alice = alice_store.new_replica(myspace.clone())?;
1858        let mut bob = bob_store.new_replica(myspace.clone())?;
1859        alice.delete_prefix("foo", &author)?;
1860        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
1861        sync(&mut alice, &mut bob)?;
1862        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1863        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1864        alice_store.flush()?;
1865        bob_store.flush()?;
1866        Ok(())
1867    }
1868
1869    #[test]
1870    fn test_replica_remove_memory() -> Result<()> {
1871        let alice_store = store::Store::memory();
1872        test_replica_remove(alice_store)
1873    }
1874
1875    #[test]
1876    fn test_replica_remove_fs() -> Result<()> {
1877        let alice_dbfile = tempfile::NamedTempFile::new()?;
1878        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1879        test_replica_remove(alice_store)
1880    }
1881
1882    fn test_replica_remove(mut store: Store) -> Result<()> {
1883        let mut rng = rand::thread_rng();
1884        let namespace = NamespaceSecret::new(&mut rng);
1885        let author = Author::new(&mut rng);
1886        let mut replica = store.new_replica(namespace.clone())?;
1887
1888        // insert entry
1889        let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
1890        let res = store
1891            .get_many(namespace.id(), Query::all())?
1892            .collect::<Vec<_>>();
1893        assert_eq!(res.len(), 1);
1894
1895        // remove replica
1896        let res = store.remove_replica(&namespace.id());
1897        // may not remove replica while still open;
1898        assert!(res.is_err());
1899        store.close_replica(namespace.id());
1900        store.remove_replica(&namespace.id())?;
1901        let res = store
1902            .get_many(namespace.id(), Query::all())?
1903            .collect::<Vec<_>>();
1904        assert_eq!(res.len(), 0);
1905
1906        // may not reopen removed replica
1907        let res = store.load_replica_info(&namespace.id());
1908        assert!(matches!(res, Err(OpenError::NotFound)));
1909
1910        // may recreate replica
1911        let mut replica = store.new_replica(namespace.clone())?;
1912        replica.insert(b"foo", &author, hash, 3)?;
1913        let res = store
1914            .get_many(namespace.id(), Query::all())?
1915            .collect::<Vec<_>>();
1916        assert_eq!(res.len(), 1);
1917        store.flush()?;
1918        Ok(())
1919    }
1920
1921    #[test]
1922    fn test_replica_delete_edge_cases_memory() -> Result<()> {
1923        let store = store::Store::memory();
1924        test_replica_delete_edge_cases(store)
1925    }
1926
1927    #[test]
1928    fn test_replica_delete_edge_cases_fs() -> Result<()> {
1929        let dbfile = tempfile::NamedTempFile::new()?;
1930        let store = store::fs::Store::persistent(dbfile.path())?;
1931        test_replica_delete_edge_cases(store)
1932    }
1933
1934    fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1935        let mut rng = rand::thread_rng();
1936        let author = Author::new(&mut rng);
1937        let namespace = NamespaceSecret::new(&mut rng);
1938
1939        let edgecases = [0u8, 1u8, 255u8];
1940        let prefixes = [0u8, 255u8];
1941        let hash = Hash::new(b"foo");
1942        let len = 3;
1943        for prefix in prefixes {
1944            let mut expected = vec![];
1945            let mut replica = store.new_replica(namespace.clone())?;
1946            for suffix in edgecases {
1947                let key = [prefix, suffix].to_vec();
1948                expected.push(key.clone());
1949                replica.insert(&key, &author, hash, len)?;
1950            }
1951            assert_keys(&mut store, namespace.id(), expected);
1952            let mut replica = store.new_replica(namespace.clone())?;
1953            replica.delete_prefix([prefix], &author)?;
1954            assert_keys(&mut store, namespace.id(), vec![]);
1955        }
1956
1957        let mut replica = store.new_replica(namespace.clone())?;
1958        let key = vec![1u8, 0u8];
1959        replica.insert(key, &author, hash, len)?;
1960        let key = vec![1u8, 1u8];
1961        replica.insert(key, &author, hash, len)?;
1962        let key = vec![1u8, 2u8];
1963        replica.insert(key, &author, hash, len)?;
1964        let prefix = vec![1u8, 1u8];
1965        replica.delete_prefix(prefix, &author)?;
1966        assert_keys(
1967            &mut store,
1968            namespace.id(),
1969            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1970        );
1971
1972        let mut replica = store.new_replica(namespace.clone())?;
1973        let key = vec![0u8, 255u8];
1974        replica.insert(key, &author, hash, len)?;
1975        let key = vec![0u8, 0u8];
1976        replica.insert(key, &author, hash, len)?;
1977        let prefix = vec![0u8];
1978        replica.delete_prefix(prefix, &author)?;
1979        assert_keys(
1980            &mut store,
1981            namespace.id(),
1982            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1983        );
1984        store.flush()?;
1985        Ok(())
1986    }
1987
1988    #[test]
1989    fn test_latest_iter_memory() -> Result<()> {
1990        let store = store::Store::memory();
1991        test_latest_iter(store)
1992    }
1993
1994    #[test]
1995    fn test_latest_iter_fs() -> Result<()> {
1996        let dbfile = tempfile::NamedTempFile::new()?;
1997        let store = store::fs::Store::persistent(dbfile.path())?;
1998        test_latest_iter(store)
1999    }
2000
2001    fn test_latest_iter(mut store: Store) -> Result<()> {
2002        let mut rng = rand::thread_rng();
2003        let author0 = Author::new(&mut rng);
2004        let author1 = Author::new(&mut rng);
2005        let namespace = NamespaceSecret::new(&mut rng);
2006        let mut replica = store.new_replica(namespace.clone())?;
2007
2008        replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
2009        let latest = store
2010            .get_latest_for_each_author(namespace.id())?
2011            .collect::<Result<Vec<_>>>()?;
2012        assert_eq!(latest.len(), 1);
2013        assert_eq!(latest[0].2, b"a0.1".to_vec());
2014
2015        let mut replica = store.new_replica(namespace.clone())?;
2016        replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
2017        replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
2018        let latest = store
2019            .get_latest_for_each_author(namespace.id())?
2020            .collect::<Result<Vec<_>>>()?;
2021        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2022        latest_keys.sort();
2023        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2024        store.flush()?;
2025        Ok(())
2026    }
2027
2028    #[test]
2029    fn test_replica_byte_keys_memory() -> Result<()> {
2030        let store = store::Store::memory();
2031
2032        test_replica_byte_keys(store)?;
2033        Ok(())
2034    }
2035
2036    #[test]
2037    fn test_replica_byte_keys_fs() -> Result<()> {
2038        let dbfile = tempfile::NamedTempFile::new()?;
2039        let store = store::fs::Store::persistent(dbfile.path())?;
2040        test_replica_byte_keys(store)?;
2041
2042        Ok(())
2043    }
2044
2045    fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2046        let mut rng = rand::thread_rng();
2047        let author = Author::new(&mut rng);
2048        let namespace = NamespaceSecret::new(&mut rng);
2049
2050        let hash = Hash::new(b"foo");
2051        let len = 3;
2052
2053        let key = vec![1u8, 0u8];
2054        let mut replica = store.new_replica(namespace.clone())?;
2055        replica.insert(key, &author, hash, len)?;
2056        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2057        let key = vec![1u8, 2u8];
2058        let mut replica = store.new_replica(namespace.clone())?;
2059        replica.insert(key, &author, hash, len)?;
2060        assert_keys(
2061            &mut store,
2062            namespace.id(),
2063            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2064        );
2065
2066        let key = vec![0u8, 255u8];
2067        let mut replica = store.new_replica(namespace.clone())?;
2068        replica.insert(key, &author, hash, len)?;
2069        assert_keys(
2070            &mut store,
2071            namespace.id(),
2072            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2073        );
2074        store.flush()?;
2075        Ok(())
2076    }
2077
2078    #[test]
2079    fn test_replica_capability_memory() -> Result<()> {
2080        let store = store::Store::memory();
2081        test_replica_capability(store)
2082    }
2083
2084    #[test]
2085    fn test_replica_capability_fs() -> Result<()> {
2086        let dbfile = tempfile::NamedTempFile::new()?;
2087        let store = store::fs::Store::persistent(dbfile.path())?;
2088        test_replica_capability(store)
2089    }
2090
2091    #[allow(clippy::redundant_pattern_matching)]
2092    fn test_replica_capability(mut store: Store) -> Result<()> {
2093        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2094        let author = store.new_author(&mut rng)?;
2095        let namespace = NamespaceSecret::new(&mut rng);
2096
2097        // import read capability - insert must fail
2098        let capability = Capability::Read(namespace.id());
2099        store.import_namespace(capability)?;
2100        let mut replica = store.open_replica(&namespace.id())?;
2101        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2102        assert!(matches!(res, Err(InsertError::ReadOnly)));
2103
2104        // import write capability - insert must succeed
2105        let capability = Capability::Write(namespace.clone());
2106        store.import_namespace(capability)?;
2107        let mut replica = store.open_replica(&namespace.id())?;
2108        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2109        assert!(matches!(res, Ok(_)));
2110        store.close_replica(namespace.id());
2111        let mut replica = store.open_replica(&namespace.id())?;
2112        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2113        assert!(res.is_ok());
2114
2115        // import read capability again - insert must still succeed
2116        let capability = Capability::Read(namespace.id());
2117        store.import_namespace(capability)?;
2118        store.close_replica(namespace.id());
2119        let mut replica = store.open_replica(&namespace.id())?;
2120        let res = replica.hash_and_insert(b"foo", &author, b"bar");
2121        assert!(res.is_ok());
2122        store.flush()?;
2123        Ok(())
2124    }
2125
2126    #[tokio::test]
2127    async fn test_actor_capability_memory() -> Result<()> {
2128        let store = store::Store::memory();
2129        test_actor_capability(store).await
2130    }
2131
2132    #[tokio::test]
2133    async fn test_actor_capability_fs() -> Result<()> {
2134        let dbfile = tempfile::NamedTempFile::new()?;
2135        let store = store::fs::Store::persistent(dbfile.path())?;
2136        test_actor_capability(store).await
2137    }
2138
2139    async fn test_actor_capability(store: Store) -> Result<()> {
2140        // test with actor
2141        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2142        let author = Author::new(&mut rng);
2143        let handle = SyncHandle::spawn(store, None, "test".into());
2144        let author = handle.import_author(author).await?;
2145        let namespace = NamespaceSecret::new(&mut rng);
2146        let id = namespace.id();
2147
2148        // import read capability - insert must fail
2149        let capability = Capability::Read(namespace.id());
2150        handle.import_namespace(capability).await?;
2151        handle.open(namespace.id(), Default::default()).await?;
2152        let res = handle
2153            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2154            .await;
2155        assert!(res.is_err());
2156
2157        // import write capability - insert must succeed
2158        let capability = Capability::Write(namespace.clone());
2159        handle.import_namespace(capability).await?;
2160        let res = handle
2161            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2162            .await;
2163        assert!(res.is_ok());
2164
2165        // close and reopen - must still succeed
2166        handle.close(namespace.id()).await?;
2167        let res = handle
2168            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2169            .await;
2170        assert!(res.is_err());
2171        handle.open(namespace.id(), Default::default()).await?;
2172        let res = handle
2173            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2174            .await;
2175        assert!(res.is_ok());
2176        Ok(())
2177    }
2178
2179    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2180        let mut res = vec![];
2181        while let Ok(ev) = events.try_recv() {
2182            res.push(ev);
2183        }
2184        res
2185    }
2186
2187    /// This tests that no events are emitted for entries received during sync which are obsolete
2188    /// (too old) by the time they are actually inserted in the store.
2189    #[test]
2190    fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2191        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2192        let mut store1 = store::Store::memory();
2193        let mut store2 = store::Store::memory();
2194        let peer1 = [1u8; 32];
2195        let peer2 = [2u8; 32];
2196        let mut state1 = SyncOutcome::default();
2197        let mut state2 = SyncOutcome::default();
2198
2199        let author = Author::new(&mut rng);
2200        let namespace = NamespaceSecret::new(&mut rng);
2201        let mut replica1 = store1.new_replica(namespace.clone())?;
2202        let mut replica2 = store2.new_replica(namespace.clone())?;
2203
2204        let (events1_sender, events1) = async_channel::bounded(32);
2205        let (events2_sender, events2) = async_channel::bounded(32);
2206
2207        replica1.info.subscribe(events1_sender);
2208        replica2.info.subscribe(events2_sender);
2209
2210        replica1.hash_and_insert(b"foo", &author, b"init")?;
2211
2212        let from1 = replica1.sync_initial_message()?;
2213        let from2 = replica2
2214            .sync_process_message(from1, peer1, &mut state2)
2215            .unwrap()
2216            .unwrap();
2217        let from1 = replica1
2218            .sync_process_message(from2, peer2, &mut state1)
2219            .unwrap()
2220            .unwrap();
2221        // now we will receive the entry from rpelica1. we will insert a newer entry now, while the
2222        // sync is already running. this means the entry from replica1 will be rejected. we make
2223        // sure that no InsertRemote event is emitted for this entry.
2224        replica2.hash_and_insert(b"foo", &author, b"update")?;
2225        let from2 = replica2
2226            .sync_process_message(from1, peer1, &mut state2)
2227            .unwrap();
2228        assert!(from2.is_none());
2229        let events1 = drain(events1);
2230        let events2 = drain(events2);
2231        assert_eq!(events1.len(), 1);
2232        assert_eq!(events2.len(), 1);
2233        assert!(matches!(events1[0], Event::LocalInsert { .. }));
2234        assert!(matches!(events2[0], Event::LocalInsert { .. }));
2235        assert_eq!(state1.num_sent, 1);
2236        assert_eq!(state1.num_recv, 0);
2237        assert_eq!(state2.num_sent, 0);
2238        assert_eq!(state2.num_recv, 1);
2239        store1.flush()?;
2240        store2.flush()?;
2241        Ok(())
2242    }
2243
2244    #[test]
2245    fn test_replica_queries_mem() -> Result<()> {
2246        let store = store::Store::memory();
2247
2248        test_replica_queries(store)?;
2249        Ok(())
2250    }
2251
2252    #[test]
2253    fn test_replica_queries_fs() -> Result<()> {
2254        let dbfile = tempfile::NamedTempFile::new()?;
2255        let store = store::fs::Store::persistent(dbfile.path())?;
2256        test_replica_queries(store)?;
2257
2258        Ok(())
2259    }
2260
2261    fn test_replica_queries(mut store: Store) -> Result<()> {
2262        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2263        let namespace = NamespaceSecret::new(&mut rng);
2264        let namespace_id = namespace.id();
2265
2266        let a1 = store.new_author(&mut rng)?;
2267        let a2 = store.new_author(&mut rng)?;
2268        let a3 = store.new_author(&mut rng)?;
2269        println!(
2270            "a1 {} a2 {} a3 {}",
2271            a1.id().fmt_short(),
2272            a2.id().fmt_short(),
2273            a3.id().fmt_short()
2274        );
2275
2276        let mut replica = store.new_replica(namespace.clone())?;
2277        replica.hash_and_insert("hi/world", &a2, "a2")?;
2278        replica.hash_and_insert("hi/world", &a1, "a1")?;
2279        replica.hash_and_insert("hi/moon", &a2, "a1")?;
2280        replica.hash_and_insert("hi", &a3, "a3")?;
2281
2282        struct QueryTester<'a> {
2283            store: &'a mut Store,
2284            namespace: NamespaceId,
2285        }
2286        impl QueryTester<'_> {
2287            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2288                let query = query.into();
2289                let actual = self
2290                    .store
2291                    .get_many(self.namespace, query.clone())
2292                    .unwrap()
2293                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2294                    .collect::<Result<Vec<_>>>()
2295                    .unwrap();
2296                let expected = expected
2297                    .into_iter()
2298                    .map(|(key, author)| (key.to_string(), author.id()))
2299                    .collect::<Vec<_>>();
2300                assert_eq!(actual, expected, "query: {query:#?}")
2301            }
2302        }
2303
2304        let mut qt = QueryTester {
2305            store: &mut store,
2306            namespace: namespace_id,
2307        };
2308
2309        qt.assert(
2310            Query::all(),
2311            vec![
2312                ("hi/world", &a1),
2313                ("hi/moon", &a2),
2314                ("hi/world", &a2),
2315                ("hi", &a3),
2316            ],
2317        );
2318
2319        qt.assert(
2320            Query::single_latest_per_key(),
2321            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2322        );
2323
2324        qt.assert(
2325            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2326            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2327        );
2328
2329        qt.assert(
2330            Query::single_latest_per_key().key_prefix("hi/"),
2331            vec![("hi/moon", &a2), ("hi/world", &a1)],
2332        );
2333
2334        qt.assert(
2335            Query::single_latest_per_key()
2336                .key_prefix("hi/")
2337                .sort_direction(SortDirection::Desc),
2338            vec![("hi/world", &a1), ("hi/moon", &a2)],
2339        );
2340
2341        qt.assert(
2342            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2343            vec![
2344                ("hi", &a3),
2345                ("hi/moon", &a2),
2346                ("hi/world", &a1),
2347                ("hi/world", &a2),
2348            ],
2349        );
2350
2351        qt.assert(
2352            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2353            vec![
2354                ("hi/world", &a2),
2355                ("hi/world", &a1),
2356                ("hi/moon", &a2),
2357                ("hi", &a3),
2358            ],
2359        );
2360
2361        qt.assert(
2362            Query::all().key_prefix("hi/"),
2363            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2364        );
2365
2366        qt.assert(
2367            Query::all().key_prefix("hi/").offset(1).limit(1),
2368            vec![("hi/moon", &a2)],
2369        );
2370
2371        qt.assert(
2372            Query::all()
2373                .key_prefix("hi/")
2374                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2375            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2376        );
2377
2378        qt.assert(
2379            Query::all()
2380                .key_prefix("hi/")
2381                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2382                .offset(1)
2383                .limit(1),
2384            vec![("hi/world", &a1)],
2385        );
2386
2387        qt.assert(
2388            Query::all()
2389                .key_prefix("hi/")
2390                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2391            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2392        );
2393
2394        qt.assert(
2395            Query::all()
2396                .key_prefix("hi/")
2397                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2398            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2399        );
2400
2401        qt.assert(
2402            Query::all()
2403                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2404                .limit(2)
2405                .offset(1),
2406            vec![("hi/moon", &a2), ("hi/world", &a1)],
2407        );
2408
2409        let mut replica = store.new_replica(namespace)?;
2410        replica.delete_prefix("hi/world", &a2)?;
2411        let mut qt = QueryTester {
2412            store: &mut store,
2413            namespace: namespace_id,
2414        };
2415
2416        qt.assert(
2417            Query::all(),
2418            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2419        );
2420
2421        qt.assert(
2422            Query::all().include_empty(),
2423            vec![
2424                ("hi/world", &a1),
2425                ("hi/moon", &a2),
2426                ("hi/world", &a2),
2427                ("hi", &a3),
2428            ],
2429        );
2430        store.flush()?;
2431        Ok(())
2432    }
2433
2434    #[test]
2435    fn test_dl_policies_mem() -> Result<()> {
2436        let mut store = store::Store::memory();
2437        test_dl_policies(&mut store)
2438    }
2439
2440    #[test]
2441    fn test_dl_policies_fs() -> Result<()> {
2442        let dbfile = tempfile::NamedTempFile::new()?;
2443        let mut store = store::fs::Store::persistent(dbfile.path())?;
2444        test_dl_policies(&mut store)
2445    }
2446
2447    fn test_dl_policies(store: &mut Store) -> Result<()> {
2448        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2449        let namespace = NamespaceSecret::new(&mut rng);
2450        let id = namespace.id();
2451
2452        let filter = store::FilterKind::Exact("foo".into());
2453        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2454        store
2455            .set_download_policy(&id, policy.clone())
2456            .expect_err("document dos not exist");
2457
2458        // now create the document
2459        store.new_replica(namespace)?;
2460
2461        store.set_download_policy(&id, policy.clone())?;
2462        let retrieved_policy = store.get_download_policy(&id)?;
2463        assert_eq!(retrieved_policy, policy);
2464        store.flush()?;
2465        Ok(())
2466    }
2467
2468    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2469        expected.sort();
2470        assert_eq!(expected, get_keys_sorted(store, namespace));
2471    }
2472
2473    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2474        let mut res = store
2475            .get_many(namespace, Query::all())
2476            .unwrap()
2477            .map(|e| e.map(|e| e.key().to_vec()))
2478            .collect::<Result<Vec<_>>>()
2479            .unwrap();
2480        res.sort();
2481        res
2482    }
2483
2484    fn get_entry(
2485        store: &mut Store,
2486        namespace: NamespaceId,
2487        author: AuthorId,
2488        key: &[u8],
2489    ) -> anyhow::Result<SignedEntry> {
2490        let entry = store
2491            .get_exact(namespace, author, key, true)?
2492            .ok_or_else(|| anyhow::anyhow!("not found"))?;
2493        Ok(entry)
2494    }
2495
2496    fn get_content_hash(
2497        store: &mut Store,
2498        namespace: NamespaceId,
2499        author: AuthorId,
2500        key: &[u8],
2501    ) -> anyhow::Result<Option<Hash>> {
2502        let hash = store
2503            .get_exact(namespace, author, key, false)?
2504            .map(|e| e.content_hash());
2505        Ok(hash)
2506    }
2507
2508    fn sync(alice: &mut Replica, bob: &mut Replica) -> Result<(SyncOutcome, SyncOutcome)> {
2509        let alice_peer_id = [1u8; 32];
2510        let bob_peer_id = [2u8; 32];
2511        let mut alice_state = SyncOutcome::default();
2512        let mut bob_state = SyncOutcome::default();
2513        // Sync alice - bob
2514        let mut next_to_bob = Some(alice.sync_initial_message()?);
2515        let mut rounds = 0;
2516        while let Some(msg) = next_to_bob.take() {
2517            assert!(rounds < 100, "too many rounds");
2518            rounds += 1;
2519            println!("round {}", rounds);
2520            if let Some(msg) = bob.sync_process_message(msg, alice_peer_id, &mut bob_state)? {
2521                next_to_bob = alice.sync_process_message(msg, bob_peer_id, &mut alice_state)?
2522            }
2523        }
2524        assert_eq!(alice_state.num_sent, bob_state.num_recv);
2525        assert_eq!(alice_state.num_recv, bob_state.num_sent);
2526        Ok((alice_state, bob_state))
2527    }
2528
2529    fn check_entries(
2530        store: &mut Store,
2531        namespace: &NamespaceId,
2532        author: &Author,
2533        set: &[&str],
2534    ) -> Result<()> {
2535        for el in set {
2536            store.get_exact(*namespace, author.id(), el, false)?;
2537        }
2538        Ok(())
2539    }
2540}