1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use ed25519_dalek::SignatureError;
18use iroh::Signature;
24use iroh_blobs::Hash;
25use n0_future::{
26 time::{Duration, SystemTime},
27 IterExt,
28};
29use serde::{Deserialize, Serialize};
30
31pub use crate::heads::AuthorHeads;
32use crate::{
33 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
34 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
35 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
36};
37
38pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
42
43pub type PeerIdBytes = [u8; 32];
46
47pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_micros() as u64;
50
51pub type ContentStatusCallback =
53 Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
54
55#[derive(derive_more::Debug, Clone)]
57pub enum Event {
58 LocalInsert {
60 namespace: NamespaceId,
62 entry: SignedEntry,
64 },
65 RemoteInsert {
67 namespace: NamespaceId,
69 entry: SignedEntry,
71 #[debug("{}", hex::encode(&from[..5]))]
74 from: PeerIdBytes,
75 should_download: bool,
77 remote_content_status: ContentStatus,
79 },
80}
81
82#[derive(derive_more::Debug, Clone)]
84pub enum InsertOrigin {
85 Local,
87 Sync {
89 #[debug("{}", hex::encode(&from[..5]))]
92 from: PeerIdBytes,
93 remote_content_status: ContentStatus,
95 },
96}
97
98#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
100pub enum ContentStatus {
101 Complete,
103 Incomplete,
105 Missing,
107}
108
109#[derive(Debug, Clone, Default)]
111pub struct SyncOutcome {
112 pub heads_received: AuthorHeads,
114 pub num_recv: usize,
116 pub num_sent: usize,
118}
119
120fn get_as_ptr<T>(value: &T) -> Option<usize> {
121 use std::mem;
122 if mem::size_of::<T>() == std::mem::size_of::<usize>()
123 && mem::align_of::<T>() == mem::align_of::<usize>()
124 {
125 unsafe { Some(mem::transmute_copy(value)) }
127 } else {
128 None
129 }
130}
131
132fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
133 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
134}
135
136#[derive(Debug, Default)]
137struct Subscribers(Vec<async_channel::Sender<Event>>);
138impl Subscribers {
139 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
140 self.0.push(sender)
141 }
142 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
143 self.0.retain(|s| !same_channel(s, sender));
144 }
145 pub async fn send(&mut self, event: Event) {
146 self.0 = std::mem::take(&mut self.0)
147 .into_iter()
148 .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
149 .join_all()
150 .await
151 .into_iter()
152 .flatten()
153 .collect();
154 }
155 pub fn len(&self) -> usize {
156 self.0.len()
157 }
158 pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
159 if !self.0.is_empty() {
160 self.send(f()).await
161 }
162 }
163}
164
165#[derive(
167 Debug,
168 Clone,
169 Copy,
170 Serialize,
171 Deserialize,
172 num_enum::IntoPrimitive,
173 num_enum::TryFromPrimitive,
174 strum::Display,
175)]
176#[repr(u8)]
177#[strum(serialize_all = "snake_case")]
178pub enum CapabilityKind {
179 Write = 1,
181 Read = 2,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
187pub enum Capability {
188 Write(NamespaceSecret),
190 Read(NamespaceId),
192}
193
194impl Capability {
195 pub fn id(&self) -> NamespaceId {
197 match self {
198 Capability::Write(secret) => secret.id(),
199 Capability::Read(id) => *id,
200 }
201 }
202
203 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
206 match self {
207 Capability::Write(secret) => Ok(secret),
208 Capability::Read(_) => Err(ReadOnly),
209 }
210 }
211
212 pub fn kind(&self) -> CapabilityKind {
214 match self {
215 Capability::Write(_) => CapabilityKind::Write,
216 Capability::Read(_) => CapabilityKind::Read,
217 }
218 }
219
220 pub fn raw(&self) -> (u8, [u8; 32]) {
222 let capability_repr: u8 = self.kind().into();
223 let bytes = match self {
224 Capability::Write(secret) => secret.to_bytes(),
225 Capability::Read(id) => id.to_bytes(),
226 };
227 (capability_repr, bytes)
228 }
229
230 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
232 let kind: CapabilityKind = kind.try_into()?;
233 let capability = match kind {
234 CapabilityKind::Write => {
235 let secret = NamespaceSecret::from_bytes(bytes);
236 Capability::Write(secret)
237 }
238 CapabilityKind::Read => {
239 let id = NamespaceId::from(bytes);
240 Capability::Read(id)
241 }
242 };
243 Ok(capability)
244 }
245
246 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
252 if other.id() != self.id() {
253 return Err(CapabilityError::NamespaceMismatch);
254 }
255
256 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
258 let _ = std::mem::replace(self, other);
259 Ok(true)
260 } else {
261 Ok(false)
262 }
263 }
264}
265
266#[derive(Debug, thiserror::Error)]
268pub enum CapabilityError {
269 #[error("Namespaces are not the same")]
271 NamespaceMismatch,
272}
273
274#[derive(derive_more::Debug)]
276pub struct ReplicaInfo {
277 pub(crate) capability: Capability,
278 subscribers: Subscribers,
279 #[debug("ContentStatusCallback")]
280 content_status_cb: Option<ContentStatusCallback>,
281 closed: bool,
282}
283
284impl ReplicaInfo {
285 pub fn new(capability: Capability) -> Self {
287 Self {
288 capability,
289 subscribers: Default::default(),
290 content_status_cb: None,
292 closed: false,
293 }
294 }
295
296 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
302 self.subscribers.subscribe(sender)
303 }
304
305 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
311 self.subscribers.unsubscribe(sender)
312 }
313
314 pub fn subscribers_count(&self) -> usize {
316 self.subscribers.len()
317 }
318
319 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
324 if self.content_status_cb.is_some() {
325 false
326 } else {
327 self.content_status_cb = Some(cb);
328 true
329 }
330 }
331
332 fn ensure_open(&self) -> Result<(), InsertError> {
333 if self.closed() {
334 Err(InsertError::Closed)
335 } else {
336 Ok(())
337 }
338 }
339
340 pub fn closed(&self) -> bool {
346 self.closed
347 }
348
349 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
356 self.capability.merge(capability)
357 }
358}
359
360#[derive(derive_more::Debug)]
362pub struct Replica<'a, I = Box<ReplicaInfo>> {
363 pub(crate) store: StoreInstance<'a>,
364 pub(crate) info: I,
365}
366
367impl<'a, I> Replica<'a, I>
368where
369 I: Deref<Target = ReplicaInfo> + DerefMut,
370{
371 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
373 Replica { info, store }
374 }
375
376 pub async fn insert(
384 &mut self,
385 key: impl AsRef<[u8]>,
386 author: &Author,
387 hash: Hash,
388 len: u64,
389 ) -> Result<usize, InsertError> {
390 if len == 0 || hash == Hash::EMPTY {
391 return Err(InsertError::EntryIsEmpty);
392 }
393 self.info.ensure_open()?;
394 let id = RecordIdentifier::new(self.id(), author.id(), key);
395 let record = Record::new_current(hash, len);
396 let entry = Entry::new(id, record);
397 let secret = self.secret_key()?;
398 let signed_entry = entry.sign(secret, author);
399 self.insert_entry(signed_entry, InsertOrigin::Local).await
400 }
401
402 pub async fn delete_prefix(
409 &mut self,
410 prefix: impl AsRef<[u8]>,
411 author: &Author,
412 ) -> Result<usize, InsertError> {
413 self.info.ensure_open()?;
414 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
415 let entry = Entry::new_empty(id);
416 let signed_entry = entry.sign(self.secret_key()?, author);
417 self.insert_entry(signed_entry, InsertOrigin::Local).await
418 }
419
420 pub async fn insert_remote_entry(
428 &mut self,
429 entry: SignedEntry,
430 received_from: PeerIdBytes,
431 content_status: ContentStatus,
432 ) -> Result<usize, InsertError> {
433 self.info.ensure_open()?;
434 entry.validate_empty()?;
435 let origin = InsertOrigin::Sync {
436 from: received_from,
437 remote_content_status: content_status,
438 };
439 self.insert_entry(entry, origin).await
440 }
441
442 async fn insert_entry(
446 &mut self,
447 entry: SignedEntry,
448 origin: InsertOrigin,
449 ) -> Result<usize, InsertError> {
450 let namespace = self.id();
451
452 let store = &self.store;
453 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
454
455 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
456 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
457
458 let removed_count = match outcome {
459 InsertOutcome::Inserted { removed } => removed,
460 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
461 };
462
463 let insert_event = match origin {
464 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
465 InsertOrigin::Sync {
466 from,
467 remote_content_status,
468 } => {
469 let download_policy = self
470 .store
471 .get_download_policy(&self.id())
472 .unwrap_or_default();
473 let should_download = download_policy.matches(entry.entry());
474 Event::RemoteInsert {
475 namespace,
476 entry,
477 from,
478 should_download,
479 remote_content_status,
480 }
481 }
482 };
483
484 self.info.subscribers.send(insert_event).await;
485
486 Ok(removed_count)
487 }
488
489 pub async fn hash_and_insert(
494 &mut self,
495 key: impl AsRef<[u8]>,
496 author: &Author,
497 data: impl AsRef<[u8]>,
498 ) -> Result<Hash, InsertError> {
499 self.info.ensure_open()?;
500 let len = data.as_ref().len() as u64;
501 let hash = Hash::new(data);
502 self.insert(key, author, hash, len).await?;
503 Ok(hash)
504 }
505
506 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
508 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
509 }
510
511 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
513 self.info.ensure_open().map_err(anyhow::Error::from)?;
514 self.store.initial_message()
515 }
516
517 pub async fn sync_process_message(
521 &mut self,
522 message: crate::ranger::Message<SignedEntry>,
523 from_peer: PeerIdBytes,
524 state: &mut SyncOutcome,
525 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
526 self.info.ensure_open()?;
527 let my_namespace = self.id();
528 let now = system_time_now();
529
530 state.num_recv += message.value_count();
532 for (entry, _content_status) in message.values() {
533 state
534 .heads_received
535 .insert(entry.author(), entry.timestamp());
536 }
537
538 let cb = self.info.content_status_cb.clone();
541 let download_policy = self
542 .store
543 .get_download_policy(&my_namespace)
544 .unwrap_or_default();
545 let reply = self
546 .store
547 .process_message(
548 &Default::default(),
549 message,
550 |store, entry, content_status| {
552 let origin = InsertOrigin::Sync {
553 from: from_peer,
554 remote_content_status: content_status,
555 };
556 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
557 },
558 async |_store, entry, content_status| {
560 self.info
562 .subscribers
563 .send_with(|| {
564 let should_download = download_policy.matches(entry.entry());
565 Event::RemoteInsert {
566 from: from_peer,
567 namespace: my_namespace,
568 entry: entry.clone(),
569 should_download,
570 remote_content_status: content_status,
571 }
572 })
573 .await
574 },
575 async move |entry| {
577 if let Some(cb) = cb.as_ref() {
578 cb(entry.content_hash()).await
579 } else {
580 ContentStatus::Missing
581 }
582 },
583 )
584 .await?;
585
586 if let Some(ref reply) = reply {
588 state.num_sent += reply.value_count();
589 }
590
591 Ok(reply)
592 }
593
594 pub fn id(&self) -> NamespaceId {
596 self.info.capability.id()
597 }
598
599 pub fn capability(&self) -> &Capability {
601 &self.info.capability
602 }
603
604 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
607 self.info.capability.secret_key()
608 }
609}
610
611#[derive(Debug, thiserror::Error)]
613#[error("Replica allows read access only.")]
614pub struct ReadOnly;
615
616fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
624 now: u64,
625 store: &S,
626 expected_namespace: NamespaceId,
627 entry: &SignedEntry,
628 origin: &InsertOrigin,
629) -> Result<(), ValidationFailure> {
630 if entry.namespace() != expected_namespace {
632 return Err(ValidationFailure::InvalidNamespace);
633 }
634
635 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
637 return Err(ValidationFailure::BadSignature);
638 }
639
640 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
642 return Err(ValidationFailure::TooFarInTheFuture);
643 }
644 Ok(())
645}
646
647#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
649pub enum InsertError {
650 #[error("storage error")]
652 Store(anyhow::Error),
653 #[error("validation failure")]
655 Validation(#[from] ValidationFailure),
656 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
658 NewerEntryExists,
659 #[error("Attempted to insert an empty entry")]
661 EntryIsEmpty,
662 #[error("Attempted to insert to read only replica")]
664 #[from(ReadOnly)]
665 ReadOnly,
666 #[error("replica is closed")]
668 Closed,
669}
670
671#[derive(thiserror::Error, Debug)]
673pub enum ValidationFailure {
674 #[error("Entry namespace does not match the current replica")]
676 InvalidNamespace,
677 #[error("Entry signature is invalid")]
679 BadSignature,
680 #[error("Entry timestamp is too far in the future.")]
682 TooFarInTheFuture,
683 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
685 InvalidEmptyEntry,
686}
687
688#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
690pub struct SignedEntry {
691 signature: EntrySignature,
692 entry: Entry,
693}
694
695impl From<SignedEntry> for Entry {
696 fn from(value: SignedEntry) -> Self {
697 value.entry
698 }
699}
700
701impl PartialOrd for SignedEntry {
702 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
703 Some(self.cmp(other))
704 }
705}
706
707impl Ord for SignedEntry {
708 fn cmp(&self, other: &Self) -> Ordering {
709 self.entry.cmp(&other.entry)
710 }
711}
712
713impl PartialOrd for Entry {
714 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
715 Some(self.cmp(other))
716 }
717}
718
719impl Ord for Entry {
720 fn cmp(&self, other: &Self) -> Ordering {
721 self.id
722 .cmp(&other.id)
723 .then_with(|| self.record.cmp(&other.record))
724 }
725}
726
727impl SignedEntry {
728 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
729 SignedEntry { signature, entry }
730 }
731
732 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
734 let signature = EntrySignature::from_entry(&entry, namespace, author);
735 SignedEntry { signature, entry }
736 }
737
738 pub fn from_parts(
740 namespace: &NamespaceSecret,
741 author: &Author,
742 key: impl AsRef<[u8]>,
743 record: Record,
744 ) -> Self {
745 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
746 let entry = Entry::new(id, record);
747 Self::from_entry(entry, namespace, author)
748 }
749
750 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
752 self.signature.verify(
753 &self.entry,
754 &self.entry.namespace().public_key(store)?,
755 &self.entry.author().public_key(store)?,
756 )
757 }
758
759 pub fn signature(&self) -> &EntrySignature {
761 &self.signature
762 }
763
764 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
766 self.entry().validate_empty()
767 }
768
769 pub fn entry(&self) -> &Entry {
771 &self.entry
772 }
773
774 pub fn content_hash(&self) -> Hash {
776 self.entry().content_hash()
777 }
778
779 pub fn content_len(&self) -> u64 {
781 self.entry().content_len()
782 }
783
784 pub fn author_bytes(&self) -> AuthorId {
786 self.entry().id().author()
787 }
788
789 pub fn key(&self) -> &[u8] {
791 self.entry().id().key()
792 }
793
794 pub fn timestamp(&self) -> u64 {
796 self.entry().timestamp()
797 }
798}
799
800impl RangeEntry for SignedEntry {
801 type Key = RecordIdentifier;
802 type Value = Record;
803
804 fn key(&self) -> &Self::Key {
805 &self.entry.id
806 }
807
808 fn value(&self) -> &Self::Value {
809 &self.entry.record
810 }
811
812 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
813 let mut hasher = blake3::Hasher::new();
814 hasher.update(self.namespace().as_ref());
815 hasher.update(self.author_bytes().as_ref());
816 hasher.update(self.key());
817 hasher.update(&self.timestamp().to_be_bytes());
818 hasher.update(self.content_hash().as_bytes());
819 Fingerprint(hasher.finalize().into())
820 }
821}
822
823#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
825pub struct EntrySignature {
826 author_signature: Signature,
827 namespace_signature: Signature,
828}
829
830impl Debug for EntrySignature {
831 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
832 f.debug_struct("EntrySignature")
833 .field(
834 "namespace_signature",
835 &hex::encode(self.namespace_signature.to_bytes()),
836 )
837 .field(
838 "author_signature",
839 &hex::encode(self.author_signature.to_bytes()),
840 )
841 .finish()
842 }
843}
844
845impl EntrySignature {
846 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
848 let bytes = entry.to_vec();
851 let namespace_signature = Signature::from_bytes(&namespace.sign(&bytes).to_bytes());
852 let author_signature = Signature::from_bytes(&author.sign(&bytes).to_bytes());
853
854 EntrySignature {
855 author_signature,
856 namespace_signature,
857 }
858 }
859
860 pub fn verify(
863 &self,
864 entry: &Entry,
865 namespace: &NamespacePublicKey,
866 author: &AuthorPublicKey,
867 ) -> Result<(), SignatureError> {
868 let bytes = entry.to_vec();
869 let namespace_signature =
870 ed25519_dalek::Signature::from_bytes(&self.namespace_signature.to_bytes());
871 let author_signature =
872 ed25519_dalek::Signature::from_bytes(&self.author_signature.to_bytes());
873 namespace.verify(&bytes, &namespace_signature)?;
874 author.verify(&bytes, &author_signature)?;
875
876 Ok(())
877 }
878
879 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
880 let namespace_signature = Signature::from_bytes(namespace_sig);
881 let author_signature = Signature::from_bytes(author_sig);
882
883 EntrySignature {
884 author_signature,
885 namespace_signature,
886 }
887 }
888
889 pub(crate) fn author(&self) -> &Signature {
890 &self.author_signature
891 }
892
893 pub(crate) fn namespace(&self) -> &Signature {
894 &self.namespace_signature
895 }
896}
897
898#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
904pub struct Entry {
905 id: RecordIdentifier,
906 record: Record,
907}
908
909impl Entry {
910 pub fn new(id: RecordIdentifier, record: Record) -> Self {
912 Entry { id, record }
913 }
914
915 pub fn new_empty(id: RecordIdentifier) -> Self {
917 Entry {
918 id,
919 record: Record::empty_current(),
920 }
921 }
922
923 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
925 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
926 (true, true) => Ok(()),
927 (false, false) => Ok(()),
928 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
929 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
930 }
931 }
932
933 pub fn id(&self) -> &RecordIdentifier {
935 &self.id
936 }
937
938 pub fn namespace(&self) -> NamespaceId {
940 self.id.namespace()
941 }
942
943 pub fn author(&self) -> AuthorId {
945 self.id.author()
946 }
947
948 pub fn key(&self) -> &[u8] {
950 self.id.key()
951 }
952
953 pub fn record(&self) -> &Record {
955 &self.record
956 }
957
958 pub fn content_hash(&self) -> Hash {
960 self.record.hash
961 }
962
963 pub fn content_len(&self) -> u64 {
965 self.record.len
966 }
967
968 pub fn timestamp(&self) -> u64 {
970 self.record.timestamp
971 }
972
973 pub fn encode(&self, out: &mut Vec<u8>) {
975 self.id.encode(out);
976 self.record.encode(out);
977 }
978
979 pub fn to_vec(&self) -> Vec<u8> {
981 let mut out = Vec::new();
982 self.encode(&mut out);
983 out
984 }
985
986 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
988 SignedEntry::from_entry(self, namespace, author)
989 }
990}
991
992const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
993const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
994const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
995
996#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
998pub struct RecordIdentifier(Bytes);
999
1000impl Default for RecordIdentifier {
1001 fn default() -> Self {
1002 Self::new(NamespaceId::default(), AuthorId::default(), b"")
1003 }
1004}
1005
1006impl Debug for RecordIdentifier {
1007 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1008 f.debug_struct("RecordIdentifier")
1009 .field("namespace", &self.namespace())
1010 .field("author", &self.author())
1011 .field("key", &std::string::String::from_utf8_lossy(self.key()))
1012 .finish()
1013 }
1014}
1015
1016impl RangeKey for RecordIdentifier {
1017 #[cfg(test)]
1018 fn is_prefix_of(&self, other: &Self) -> bool {
1019 other.as_ref().starts_with(self.as_ref())
1020 }
1021}
1022
1023fn system_time_now() -> u64 {
1024 SystemTime::now()
1025 .duration_since(SystemTime::UNIX_EPOCH)
1026 .expect("time drift")
1027 .as_micros() as u64
1028}
1029
1030impl RecordIdentifier {
1031 pub fn new(
1033 namespace: impl Into<NamespaceId>,
1034 author: impl Into<AuthorId>,
1035 key: impl AsRef<[u8]>,
1036 ) -> Self {
1037 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1038 bytes.extend_from_slice(namespace.into().as_bytes());
1039 bytes.extend_from_slice(author.into().as_bytes());
1040 bytes.extend_from_slice(key.as_ref());
1041 Self(bytes.freeze())
1042 }
1043
1044 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1046 out.extend_from_slice(&self.0);
1047 }
1048
1049 pub fn as_bytes(&self) -> Bytes {
1051 self.0.clone()
1052 }
1053
1054 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1056 (
1057 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1058 self.0[AUTHOR_BYTES].try_into().unwrap(),
1059 &self.0[KEY_BYTES],
1060 )
1061 }
1062
1063 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1065 (
1066 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1067 self.0[AUTHOR_BYTES].try_into().unwrap(),
1068 self.0.slice(KEY_BYTES),
1069 )
1070 }
1071
1072 pub fn key(&self) -> &[u8] {
1074 &self.0[KEY_BYTES]
1075 }
1076
1077 pub fn key_bytes(&self) -> Bytes {
1079 self.0.slice(KEY_BYTES)
1080 }
1081
1082 pub fn namespace(&self) -> NamespaceId {
1084 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1085 value.into()
1086 }
1087
1088 pub fn author(&self) -> AuthorId {
1090 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1091 value.into()
1092 }
1093}
1094
1095impl AsRef<[u8]> for RecordIdentifier {
1096 fn as_ref(&self) -> &[u8] {
1097 &self.0
1098 }
1099}
1100
1101impl Deref for SignedEntry {
1102 type Target = Entry;
1103 fn deref(&self) -> &Self::Target {
1104 &self.entry
1105 }
1106}
1107
1108impl Deref for Entry {
1109 type Target = Record;
1110 fn deref(&self) -> &Self::Target {
1111 &self.record
1112 }
1113}
1114
1115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1117pub struct Record {
1118 len: u64,
1120 hash: Hash,
1122 timestamp: u64,
1124}
1125
1126impl RangeValue for Record {}
1127
1128impl Ord for Record {
1132 fn cmp(&self, other: &Self) -> Ordering {
1133 self.timestamp
1134 .cmp(&other.timestamp)
1135 .then_with(|| self.hash.cmp(&other.hash))
1136 }
1137}
1138
1139impl PartialOrd for Record {
1140 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1141 Some(self.cmp(other))
1142 }
1143}
1144
1145impl Record {
1146 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1148 debug_assert!(
1149 len != 0 || hash == Hash::EMPTY,
1150 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1151 );
1152 Record {
1153 hash,
1154 len,
1155 timestamp,
1156 }
1157 }
1158
1159 pub fn empty(timestamp: u64) -> Self {
1161 Self::new(Hash::EMPTY, 0, timestamp)
1162 }
1163
1164 pub fn empty_current() -> Self {
1166 Self::new_current(Hash::EMPTY, 0)
1167 }
1168
1169 pub fn is_empty(&self) -> bool {
1171 self.hash == Hash::EMPTY
1172 }
1173
1174 pub fn new_current(hash: Hash, len: u64) -> Self {
1176 let timestamp = system_time_now();
1177 Self::new(hash, len, timestamp)
1178 }
1179
1180 pub fn content_len(&self) -> u64 {
1182 self.len
1183 }
1184
1185 pub fn content_hash(&self) -> Hash {
1187 self.hash
1188 }
1189
1190 pub fn timestamp(&self) -> u64 {
1192 self.timestamp
1193 }
1194
1195 #[cfg(test)]
1196 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1197 let len = data.as_ref().len() as u64;
1198 let hash = Hash::new(data);
1199 Self::new_current(hash, len)
1200 }
1201
1202 #[cfg(test)]
1203 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1204 let len = data.as_ref().len() as u64;
1205 let hash = Hash::new(data);
1206 Self::new(hash, len, timestamp)
1207 }
1208
1209 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1211 out.extend_from_slice(&self.len.to_be_bytes());
1212 out.extend_from_slice(self.hash.as_ref());
1213 out.extend_from_slice(&self.timestamp.to_be_bytes())
1214 }
1215}
1216
1217#[cfg(test)]
1218mod tests {
1219 use std::collections::HashSet;
1220
1221 use anyhow::Result;
1222 use rand::SeedableRng;
1223
1224 use super::*;
1225 use crate::{
1226 actor::SyncHandle,
1227 ranger::{Range, Store as _},
1228 store::{OpenError, Query, SortBy, SortDirection, Store},
1229 };
1230
1231 #[tokio::test]
1232 async fn test_basics_memory() -> Result<()> {
1233 let store = store::Store::memory();
1234 test_basics(store).await?;
1235
1236 Ok(())
1237 }
1238
1239 #[tokio::test]
1240 #[cfg(feature = "fs-store")]
1241 async fn test_basics_fs() -> Result<()> {
1242 let dbfile = tempfile::NamedTempFile::new()?;
1243 let store = store::fs::Store::persistent(dbfile.path())?;
1244 test_basics(store).await?;
1245 Ok(())
1246 }
1247
1248 async fn test_basics(mut store: Store) -> Result<()> {
1249 let mut rng = rand::rng();
1250 let alice = Author::new(&mut rng);
1251 let bob = Author::new(&mut rng);
1252 let myspace = NamespaceSecret::new(&mut rng);
1253
1254 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1255 let record = Record::current_from_data(b"this is my cool data");
1256 let entry = Entry::new(record_id, record);
1257 let signed_entry = entry.sign(&myspace, &alice);
1258 signed_entry.verify(&()).expect("failed to verify");
1259
1260 let mut my_replica = store.new_replica(myspace.clone())?;
1261 for i in 0..10 {
1262 my_replica
1263 .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1264 .await?;
1265 }
1266
1267 for i in 0..10 {
1268 let res = store
1269 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1270 .unwrap();
1271 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1272 assert_eq!(res.entry().record().content_len(), len);
1273 res.verify(&())?;
1274 }
1275
1276 let mut my_replica = store.new_replica(myspace.clone())?;
1278 my_replica
1279 .hash_and_insert("/cool/path", &alice, "round 1")
1280 .await?;
1281 let _entry = store
1282 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1283 .unwrap();
1284 let mut my_replica = store.new_replica(myspace.clone())?;
1286 my_replica
1287 .hash_and_insert("/cool/path", &alice, "round 2")
1288 .await?;
1289 let _entry = store
1290 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1291 .unwrap();
1292
1293 let entries: Vec<_> = store
1295 .get_many(myspace.id(), Query::author(alice.id()))?
1296 .collect::<Result<_>>()?;
1297 assert_eq!(entries.len(), 11);
1298
1299 let entries: Vec<_> = store
1301 .get_many(myspace.id(), Query::author(bob.id()))?
1302 .collect::<Result<_>>()?;
1303 assert_eq!(entries.len(), 0);
1304
1305 let entries: Vec<_> = store
1307 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1308 .collect::<Result<_>>()?;
1309 assert_eq!(entries.len(), 1);
1310
1311 let entries: Vec<_> = store
1313 .get_many(myspace.id(), Query::all())?
1314 .collect::<Result<_>>()?;
1315 assert_eq!(entries.len(), 11);
1316
1317 let mut my_replica = store.new_replica(myspace.clone())?;
1319 let _entry = my_replica
1320 .hash_and_insert("/cool/path", &bob, "bob round 1")
1321 .await?;
1322
1323 let entries: Vec<_> = store
1325 .get_many(myspace.id(), Query::author(alice.id()))?
1326 .collect::<Result<_>>()?;
1327 assert_eq!(entries.len(), 11);
1328
1329 let entries: Vec<_> = store
1330 .get_many(myspace.id(), Query::author(bob.id()))?
1331 .collect::<Result<_>>()?;
1332 assert_eq!(entries.len(), 1);
1333
1334 let entries: Vec<_> = store
1336 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1337 .collect::<Result<_>>()?;
1338 assert_eq!(entries.len(), 2);
1339
1340 let entries: Vec<_> = store
1342 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1343 .collect::<Result<_>>()?;
1344 assert_eq!(entries.len(), 2);
1345
1346 let entries: Vec<_> = store
1348 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1349 .collect::<Result<_>>()?;
1350 assert_eq!(entries.len(), 1);
1351
1352 let entries: Vec<_> = store
1353 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1354 .collect::<Result<_>>()?;
1355 assert_eq!(entries.len(), 1);
1356
1357 let entries: Vec<_> = store
1359 .get_many(myspace.id(), Query::all())?
1360 .collect::<Result<_>>()?;
1361 assert_eq!(entries.len(), 12);
1362
1363 let mut my_replica = store.new_replica(myspace.clone())?;
1365 let entries_second: Vec<_> = my_replica
1366 .store
1367 .get_range(Range::new(
1368 RecordIdentifier::default(),
1369 RecordIdentifier::default(),
1370 ))?
1371 .collect::<Result<_, _>>()?;
1372
1373 assert_eq!(entries_second.len(), 12);
1374 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1375
1376 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1377 store.flush()?;
1378 Ok(())
1379 }
1380
1381 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1384 #[track_caller]
1386 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1387 assert_eq!(
1388 expected_peers,
1389 &store
1390 .get_sync_peers(&namespace)
1391 .unwrap()
1392 .unwrap()
1393 .collect::<Vec<_>>(),
1394 "sync peers differ"
1395 );
1396 }
1397
1398 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1399 let mut expected_peers = Vec::with_capacity(count);
1401 for i in 0..count as u8 {
1402 let peer = [i; 32];
1403 expected_peers.insert(0, peer);
1404 store.register_useful_peer(namespace, peer)?;
1405 }
1406 verify_peers(store, namespace, &expected_peers);
1407
1408 expected_peers.pop();
1410 let newer_peer = [count as u8; 32];
1411 expected_peers.insert(0, newer_peer);
1412 store.register_useful_peer(namespace, newer_peer)?;
1413 verify_peers(store, namespace, &expected_peers);
1414
1415 let refreshed_peer = expected_peers.remove(2);
1417 expected_peers.insert(0, refreshed_peer);
1418 store.register_useful_peer(namespace, refreshed_peer)?;
1419 verify_peers(store, namespace, &expected_peers);
1420 Ok(())
1421 }
1422
1423 #[tokio::test]
1424 async fn test_content_hashes_iterator_memory() -> Result<()> {
1425 let store = store::Store::memory();
1426 test_content_hashes_iterator(store).await
1427 }
1428
1429 #[tokio::test]
1430 #[cfg(feature = "fs-store")]
1431 async fn test_content_hashes_iterator_fs() -> Result<()> {
1432 let dbfile = tempfile::NamedTempFile::new()?;
1433 let store = store::fs::Store::persistent(dbfile.path())?;
1434 test_content_hashes_iterator(store).await
1435 }
1436
1437 async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1438 let mut rng = rand::rng();
1439 let mut expected = HashSet::new();
1440 let n_replicas = 3;
1441 let n_entries = 4;
1442 for i in 0..n_replicas {
1443 let namespace = NamespaceSecret::new(&mut rng);
1444 let author = store.new_author(&mut rng)?;
1445 let mut replica = store.new_replica(namespace)?;
1446 for j in 0..n_entries {
1447 let key = format!("{j}");
1448 let data = format!("{i}:{j}");
1449 let hash = replica.hash_and_insert(key, &author, data).await?;
1450 expected.insert(hash);
1451 }
1452 }
1453 assert_eq!(expected.len(), n_replicas * n_entries);
1454 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1455 assert_eq!(actual, expected);
1456 Ok(())
1457 }
1458
1459 #[test]
1460 fn test_multikey() {
1461 let mut rng = rand::rng();
1462
1463 let k = ["a", "c", "z"];
1464
1465 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1466 n.sort_by_key(|n| n.id());
1467
1468 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1469 a.sort_by_key(|a| a.id());
1470
1471 {
1473 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1474 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1475 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1476
1477 let range = Range::new(ri0.clone(), ri2.clone());
1478 assert!(range.contains(&ri0), "start");
1479 assert!(range.contains(&ri1), "inside");
1480 assert!(!range.contains(&ri2), "end");
1481
1482 assert!(ri0 < ri1);
1483 assert!(ri1 < ri2);
1484 }
1485
1486 {
1488 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1489 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1490 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1491
1492 let range = Range::new(ri0.clone(), ri2.clone());
1493 assert!(range.contains(&ri0), "start");
1494 assert!(range.contains(&ri1), "inside");
1495 assert!(!range.contains(&ri2), "end");
1496
1497 assert!(ri0 < ri1);
1498 assert!(ri1 < ri2);
1499 }
1500
1501 {
1503 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1504 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1505 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1506
1507 let range = Range::new(ri0.clone(), ri2.clone());
1508 assert!(range.contains(&ri0), "start");
1509 assert!(range.contains(&ri1), "inside");
1510 assert!(!range.contains(&ri2), "end");
1511
1512 assert!(ri0 < ri1);
1513 assert!(ri1 < ri2);
1514 }
1515
1516 {
1518 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1519 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1520 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1521
1522 let range = Range::new(ri0.clone(), ri2.clone());
1523 assert!(range.contains(&ri0), "start");
1524 assert!(range.contains(&ri1), "inside");
1525 assert!(!range.contains(&ri2), "end");
1526
1527 assert!(ri0 < ri1);
1528 assert!(ri1 < ri2);
1529 }
1530
1531 {
1533 let a0 = a[0].id();
1536 let a1 = a[1].id();
1537 let n0 = n[0].id();
1538 let n1 = n[1].id();
1539 let k0 = k[0];
1540 let k1 = k[1];
1541
1542 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1543 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1544 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1545 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1546 }
1547 }
1548
1549 #[tokio::test]
1550 async fn test_timestamps_memory() -> Result<()> {
1551 let store = store::Store::memory();
1552 test_timestamps(store).await?;
1553
1554 Ok(())
1555 }
1556
1557 #[tokio::test]
1558 #[cfg(feature = "fs-store")]
1559 async fn test_timestamps_fs() -> Result<()> {
1560 let dbfile = tempfile::NamedTempFile::new()?;
1561 let store = store::fs::Store::persistent(dbfile.path())?;
1562 test_timestamps(store).await?;
1563 Ok(())
1564 }
1565
1566 async fn test_timestamps(mut store: Store) -> Result<()> {
1567 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1568 let namespace = NamespaceSecret::new(&mut rng);
1569 let _replica = store.new_replica(namespace.clone())?;
1570 let author = store.new_author(&mut rng)?;
1571 store.close_replica(namespace.id());
1572 let mut replica = store.open_replica(&namespace.id())?;
1573
1574 let key = b"hello";
1575 let value = b"world";
1576 let entry = {
1577 let timestamp = 2;
1578 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1579 let record = Record::from_data(value, timestamp);
1580 Entry::new(id, record).sign(&namespace, &author)
1581 };
1582
1583 replica
1584 .insert_entry(entry.clone(), InsertOrigin::Local)
1585 .await
1586 .unwrap();
1587 store.close_replica(namespace.id());
1588 let res = store
1589 .get_exact(namespace.id(), author.id(), key, false)?
1590 .unwrap();
1591 assert_eq!(res, entry);
1592
1593 let entry2 = {
1594 let timestamp = 1;
1595 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1596 let record = Record::from_data(value, timestamp);
1597 Entry::new(id, record).sign(&namespace, &author)
1598 };
1599
1600 let mut replica = store.open_replica(&namespace.id())?;
1601 let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1602 store.close_replica(namespace.id());
1603 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1604 let res = store
1605 .get_exact(namespace.id(), author.id(), key, false)?
1606 .unwrap();
1607 assert_eq!(res, entry);
1608 store.flush()?;
1609 Ok(())
1610 }
1611
1612 #[tokio::test]
1613 async fn test_replica_sync_memory() -> Result<()> {
1614 let alice_store = store::Store::memory();
1615 let bob_store = store::Store::memory();
1616
1617 test_replica_sync(alice_store, bob_store).await?;
1618 Ok(())
1619 }
1620
1621 #[tokio::test]
1622 #[cfg(feature = "fs-store")]
1623 async fn test_replica_sync_fs() -> Result<()> {
1624 let alice_dbfile = tempfile::NamedTempFile::new()?;
1625 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1626 let bob_dbfile = tempfile::NamedTempFile::new()?;
1627 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1628 test_replica_sync(alice_store, bob_store).await?;
1629
1630 Ok(())
1631 }
1632
1633 async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1634 let alice_set = ["ape", "eel", "fox", "gnu"];
1635 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1636
1637 let mut rng = rand::rng();
1638 let author = Author::new(&mut rng);
1639 let myspace = NamespaceSecret::new(&mut rng);
1640 let mut alice = alice_store.new_replica(myspace.clone())?;
1641 for el in &alice_set {
1642 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1643 }
1644
1645 let mut bob = bob_store.new_replica(myspace.clone())?;
1646 for el in &bob_set {
1647 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1648 }
1649
1650 let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1651
1652 assert_eq!(alice_out.num_sent, 2);
1653 assert_eq!(bob_out.num_recv, 2);
1654 assert_eq!(alice_out.num_recv, 6);
1655 assert_eq!(bob_out.num_sent, 6);
1656
1657 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1658 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1659 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1660 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1661 alice_store.flush()?;
1662 bob_store.flush()?;
1663 Ok(())
1664 }
1665
1666 #[tokio::test]
1667 async fn test_replica_timestamp_sync_memory() -> Result<()> {
1668 let alice_store = store::Store::memory();
1669 let bob_store = store::Store::memory();
1670
1671 test_replica_timestamp_sync(alice_store, bob_store).await?;
1672 Ok(())
1673 }
1674
1675 #[tokio::test]
1676 #[cfg(feature = "fs-store")]
1677 async fn test_replica_timestamp_sync_fs() -> Result<()> {
1678 let alice_dbfile = tempfile::NamedTempFile::new()?;
1679 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1680 let bob_dbfile = tempfile::NamedTempFile::new()?;
1681 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1682 test_replica_timestamp_sync(alice_store, bob_store).await?;
1683
1684 Ok(())
1685 }
1686
1687 async fn test_replica_timestamp_sync(
1688 mut alice_store: Store,
1689 mut bob_store: Store,
1690 ) -> Result<()> {
1691 let mut rng = rand::rng();
1692 let author = Author::new(&mut rng);
1693 let namespace = NamespaceSecret::new(&mut rng);
1694 let mut alice = alice_store.new_replica(namespace.clone())?;
1695 let mut bob = bob_store.new_replica(namespace.clone())?;
1696
1697 let key = b"key";
1698 let alice_value = b"alice";
1699 let bob_value = b"bob";
1700 let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1701 let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1703 sync(&mut alice, &mut bob).await?;
1704 assert_eq!(
1705 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1706 Some(bob_hash)
1707 );
1708 assert_eq!(
1709 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1710 Some(bob_hash)
1711 );
1712
1713 let mut alice = alice_store.new_replica(namespace.clone())?;
1714 let mut bob = bob_store.new_replica(namespace.clone())?;
1715
1716 let alice_value_2 = b"alice2";
1717 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1719 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1720 sync(&mut alice, &mut bob).await?;
1721 assert_eq!(
1722 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1723 Some(alice_hash_2)
1724 );
1725 assert_eq!(
1726 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1727 Some(alice_hash_2)
1728 );
1729 alice_store.flush()?;
1730 bob_store.flush()?;
1731 Ok(())
1732 }
1733
1734 #[tokio::test]
1735 async fn test_future_timestamp() -> Result<()> {
1736 let mut rng = rand::rng();
1737 let mut store = store::Store::memory();
1738 let author = Author::new(&mut rng);
1739 let namespace = NamespaceSecret::new(&mut rng);
1740
1741 let mut replica = store.new_replica(namespace.clone())?;
1742 let key = b"hi";
1743 let t = system_time_now();
1744 let record = Record::from_data(b"1", t);
1745 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1746 replica
1747 .insert_entry(entry0.clone(), InsertOrigin::Local)
1748 .await?;
1749
1750 assert_eq!(
1751 get_entry(&mut store, namespace.id(), author.id(), key)?,
1752 entry0
1753 );
1754
1755 let mut replica = store.new_replica(namespace.clone())?;
1756 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1757 let record = Record::from_data(b"2", t);
1758 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1759 replica
1760 .insert_entry(entry1.clone(), InsertOrigin::Local)
1761 .await?;
1762 assert_eq!(
1763 get_entry(&mut store, namespace.id(), author.id(), key)?,
1764 entry1
1765 );
1766
1767 let mut replica = store.new_replica(namespace.clone())?;
1768 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1769 let record = Record::from_data(b"2", t);
1770 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1771 replica
1772 .insert_entry(entry2.clone(), InsertOrigin::Local)
1773 .await?;
1774 assert_eq!(
1775 get_entry(&mut store, namespace.id(), author.id(), key)?,
1776 entry2
1777 );
1778
1779 let mut replica = store.new_replica(namespace.clone())?;
1780 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1781 let record = Record::from_data(b"2", t);
1782 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1783 let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1784 assert!(matches!(
1785 res,
1786 Err(InsertError::Validation(
1787 ValidationFailure::TooFarInTheFuture
1788 ))
1789 ));
1790 assert_eq!(
1791 get_entry(&mut store, namespace.id(), author.id(), key)?,
1792 entry2
1793 );
1794 store.flush()?;
1795 Ok(())
1796 }
1797
1798 #[tokio::test]
1802 async fn test_future_timestamp_accepts_one_second_skew() -> Result<()> {
1803 let mut rng = rand::rng();
1804 let mut store = store::Store::memory();
1805 let author = Author::new(&mut rng);
1806 let namespace = NamespaceSecret::new(&mut rng);
1807 let mut replica = store.new_replica(namespace.clone())?;
1808 let key = b"skew";
1809 let skew_micros: u64 = Duration::from_secs(1).as_micros() as u64;
1810 let now = system_time_now();
1811 let record = Record::from_data(b"ahead", now + skew_micros);
1812 let entry = SignedEntry::from_parts(&namespace, &author, key, record);
1813 replica
1814 .insert_entry(entry.clone(), InsertOrigin::Local)
1815 .await?;
1816 assert_eq!(
1817 get_entry(&mut store, namespace.id(), author.id(), key)?,
1818 entry
1819 );
1820 store.flush()?;
1821 Ok(())
1822 }
1823
1824 #[tokio::test]
1825 async fn test_insert_empty() -> Result<()> {
1826 let mut store = store::Store::memory();
1827 let mut rng = rand::rng();
1828 let alice = Author::new(&mut rng);
1829 let myspace = NamespaceSecret::new(&mut rng);
1830 let mut replica = store.new_replica(myspace.clone())?;
1831 let hash = Hash::new(b"");
1832 let res = replica.insert(b"foo", &alice, hash, 0).await;
1833 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1834 store.flush()?;
1835 Ok(())
1836 }
1837
1838 #[tokio::test]
1839 async fn test_prefix_delete_memory() -> Result<()> {
1840 let store = store::Store::memory();
1841 test_prefix_delete(store).await?;
1842 Ok(())
1843 }
1844
1845 #[tokio::test]
1846 #[cfg(feature = "fs-store")]
1847 async fn test_prefix_delete_fs() -> Result<()> {
1848 let dbfile = tempfile::NamedTempFile::new()?;
1849 let store = store::fs::Store::persistent(dbfile.path())?;
1850 test_prefix_delete(store).await?;
1851 Ok(())
1852 }
1853
1854 async fn test_prefix_delete(mut store: Store) -> Result<()> {
1855 let mut rng = rand::rng();
1856 let alice = Author::new(&mut rng);
1857 let myspace = NamespaceSecret::new(&mut rng);
1858 let mut replica = store.new_replica(myspace.clone())?;
1859 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1860 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1861
1862 assert_eq!(
1864 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1865 Some(hash1)
1866 );
1867 assert_eq!(
1868 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1869 Some(hash2)
1870 );
1871
1872 let mut replica = store.new_replica(myspace.clone())?;
1874 let deleted = replica.delete_prefix(b"foo", &alice).await?;
1875 assert_eq!(deleted, 2);
1876 assert_eq!(
1877 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1878 None
1879 );
1880 assert_eq!(
1881 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1882 None
1883 );
1884 assert_eq!(
1885 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1886 None
1887 );
1888 store.flush()?;
1889 Ok(())
1890 }
1891
1892 #[tokio::test]
1893 async fn test_replica_sync_delete_memory() -> Result<()> {
1894 let alice_store = store::Store::memory();
1895 let bob_store = store::Store::memory();
1896
1897 test_replica_sync_delete(alice_store, bob_store).await
1898 }
1899
1900 #[tokio::test]
1901 #[cfg(feature = "fs-store")]
1902 async fn test_replica_sync_delete_fs() -> Result<()> {
1903 let alice_dbfile = tempfile::NamedTempFile::new()?;
1904 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1905 let bob_dbfile = tempfile::NamedTempFile::new()?;
1906 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1907 test_replica_sync_delete(alice_store, bob_store).await
1908 }
1909
1910 async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1911 let alice_set = ["foot"];
1912 let bob_set = ["fool", "foo", "fog"];
1913
1914 let mut rng = rand::rng();
1915 let author = Author::new(&mut rng);
1916 let myspace = NamespaceSecret::new(&mut rng);
1917 let mut alice = alice_store.new_replica(myspace.clone())?;
1918 for el in &alice_set {
1919 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1920 }
1921
1922 let mut bob = bob_store.new_replica(myspace.clone())?;
1923 for el in &bob_set {
1924 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1925 }
1926
1927 sync(&mut alice, &mut bob).await?;
1928
1929 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1930 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1931 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1932 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1933
1934 let mut alice = alice_store.new_replica(myspace.clone())?;
1935 let mut bob = bob_store.new_replica(myspace.clone())?;
1936 alice.delete_prefix("foo", &author).await?;
1937 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1938 .await?;
1939 sync(&mut alice, &mut bob).await?;
1940 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1941 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1942 alice_store.flush()?;
1943 bob_store.flush()?;
1944 Ok(())
1945 }
1946
1947 #[tokio::test]
1948 async fn test_replica_remove_memory() -> Result<()> {
1949 let alice_store = store::Store::memory();
1950 test_replica_remove(alice_store).await
1951 }
1952
1953 #[tokio::test]
1954 #[cfg(feature = "fs-store")]
1955 async fn test_replica_remove_fs() -> Result<()> {
1956 let alice_dbfile = tempfile::NamedTempFile::new()?;
1957 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1958 test_replica_remove(alice_store).await
1959 }
1960
1961 async fn test_replica_remove(mut store: Store) -> Result<()> {
1962 let mut rng = rand::rng();
1963 let namespace = NamespaceSecret::new(&mut rng);
1964 let author = Author::new(&mut rng);
1965 let mut replica = store.new_replica(namespace.clone())?;
1966
1967 let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1969 let res = store
1970 .get_many(namespace.id(), Query::all())?
1971 .collect::<Vec<_>>();
1972 assert_eq!(res.len(), 1);
1973
1974 let res = store.remove_replica(&namespace.id());
1976 assert!(res.is_err());
1978 store.close_replica(namespace.id());
1979 store.remove_replica(&namespace.id())?;
1980 let res = store
1981 .get_many(namespace.id(), Query::all())?
1982 .collect::<Vec<_>>();
1983 assert_eq!(res.len(), 0);
1984
1985 let res = store.load_replica_info(&namespace.id());
1987 assert!(matches!(res, Err(OpenError::NotFound)));
1988
1989 let mut replica = store.new_replica(namespace.clone())?;
1991 replica.insert(b"foo", &author, hash, 3).await?;
1992 let res = store
1993 .get_many(namespace.id(), Query::all())?
1994 .collect::<Vec<_>>();
1995 assert_eq!(res.len(), 1);
1996 store.flush()?;
1997 Ok(())
1998 }
1999
2000 #[tokio::test]
2001 async fn test_replica_delete_edge_cases_memory() -> Result<()> {
2002 let store = store::Store::memory();
2003 test_replica_delete_edge_cases(store).await
2004 }
2005
2006 #[tokio::test]
2007 #[cfg(feature = "fs-store")]
2008 async fn test_replica_delete_edge_cases_fs() -> Result<()> {
2009 let dbfile = tempfile::NamedTempFile::new()?;
2010 let store = store::fs::Store::persistent(dbfile.path())?;
2011 test_replica_delete_edge_cases(store).await
2012 }
2013
2014 async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
2015 let mut rng = rand::rng();
2016 let author = Author::new(&mut rng);
2017 let namespace = NamespaceSecret::new(&mut rng);
2018
2019 let edgecases = [0u8, 1u8, 255u8];
2020 let prefixes = [0u8, 255u8];
2021 let hash = Hash::new(b"foo");
2022 let len = 3;
2023 for prefix in prefixes {
2024 let mut expected = vec![];
2025 let mut replica = store.new_replica(namespace.clone())?;
2026 for suffix in edgecases {
2027 let key = [prefix, suffix].to_vec();
2028 expected.push(key.clone());
2029 replica.insert(&key, &author, hash, len).await?;
2030 }
2031 assert_keys(&mut store, namespace.id(), expected);
2032 let mut replica = store.new_replica(namespace.clone())?;
2033 replica.delete_prefix([prefix], &author).await?;
2034 assert_keys(&mut store, namespace.id(), vec![]);
2035 }
2036
2037 let mut replica = store.new_replica(namespace.clone())?;
2038 let key = vec![1u8, 0u8];
2039 replica.insert(key, &author, hash, len).await?;
2040 let key = vec![1u8, 1u8];
2041 replica.insert(key, &author, hash, len).await?;
2042 let key = vec![1u8, 2u8];
2043 replica.insert(key, &author, hash, len).await?;
2044 let prefix = vec![1u8, 1u8];
2045 replica.delete_prefix(prefix, &author).await?;
2046 assert_keys(
2047 &mut store,
2048 namespace.id(),
2049 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2050 );
2051
2052 let mut replica = store.new_replica(namespace.clone())?;
2053 let key = vec![0u8, 255u8];
2054 replica.insert(key, &author, hash, len).await?;
2055 let key = vec![0u8, 0u8];
2056 replica.insert(key, &author, hash, len).await?;
2057 let prefix = vec![0u8];
2058 replica.delete_prefix(prefix, &author).await?;
2059 assert_keys(
2060 &mut store,
2061 namespace.id(),
2062 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2063 );
2064 store.flush()?;
2065 Ok(())
2066 }
2067
2068 #[tokio::test]
2069 async fn test_latest_iter_memory() -> Result<()> {
2070 let store = store::Store::memory();
2071 test_latest_iter(store).await
2072 }
2073
2074 #[tokio::test]
2075 #[cfg(feature = "fs-store")]
2076 async fn test_latest_iter_fs() -> Result<()> {
2077 let dbfile = tempfile::NamedTempFile::new()?;
2078 let store = store::fs::Store::persistent(dbfile.path())?;
2079 test_latest_iter(store).await
2080 }
2081
2082 async fn test_latest_iter(mut store: Store) -> Result<()> {
2083 let mut rng = rand::rng();
2084 let author0 = Author::new(&mut rng);
2085 let author1 = Author::new(&mut rng);
2086 let namespace = NamespaceSecret::new(&mut rng);
2087 let mut replica = store.new_replica(namespace.clone())?;
2088
2089 replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2090 let latest = store
2091 .get_latest_for_each_author(namespace.id())?
2092 .collect::<Result<Vec<_>>>()?;
2093 assert_eq!(latest.len(), 1);
2094 assert_eq!(latest[0].2, b"a0.1".to_vec());
2095
2096 let mut replica = store.new_replica(namespace.clone())?;
2097 replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2098 replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2099 let latest = store
2100 .get_latest_for_each_author(namespace.id())?
2101 .collect::<Result<Vec<_>>>()?;
2102 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2103 latest_keys.sort();
2104 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2105 store.flush()?;
2106 Ok(())
2107 }
2108
2109 #[tokio::test]
2110 async fn test_replica_byte_keys_memory() -> Result<()> {
2111 let store = store::Store::memory();
2112
2113 test_replica_byte_keys(store).await?;
2114 Ok(())
2115 }
2116
2117 #[tokio::test]
2118 #[cfg(feature = "fs-store")]
2119 async fn test_replica_byte_keys_fs() -> Result<()> {
2120 let dbfile = tempfile::NamedTempFile::new()?;
2121 let store = store::fs::Store::persistent(dbfile.path())?;
2122 test_replica_byte_keys(store).await?;
2123
2124 Ok(())
2125 }
2126
2127 async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2128 let mut rng = rand::rng();
2129 let author = Author::new(&mut rng);
2130 let namespace = NamespaceSecret::new(&mut rng);
2131
2132 let hash = Hash::new(b"foo");
2133 let len = 3;
2134
2135 let key = vec![1u8, 0u8];
2136 let mut replica = store.new_replica(namespace.clone())?;
2137 replica.insert(key, &author, hash, len).await?;
2138 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2139 let key = vec![1u8, 2u8];
2140 let mut replica = store.new_replica(namespace.clone())?;
2141 replica.insert(key, &author, hash, len).await?;
2142 assert_keys(
2143 &mut store,
2144 namespace.id(),
2145 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2146 );
2147
2148 let key = vec![0u8, 255u8];
2149 let mut replica = store.new_replica(namespace.clone())?;
2150 replica.insert(key, &author, hash, len).await?;
2151 assert_keys(
2152 &mut store,
2153 namespace.id(),
2154 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2155 );
2156 store.flush()?;
2157 Ok(())
2158 }
2159
2160 #[tokio::test]
2161 async fn test_replica_capability_memory() -> Result<()> {
2162 let store = store::Store::memory();
2163 test_replica_capability(store).await
2164 }
2165
2166 #[tokio::test]
2167 #[cfg(feature = "fs-store")]
2168 async fn test_replica_capability_fs() -> Result<()> {
2169 let dbfile = tempfile::NamedTempFile::new()?;
2170 let store = store::fs::Store::persistent(dbfile.path())?;
2171 test_replica_capability(store).await
2172 }
2173
2174 #[allow(clippy::redundant_pattern_matching)]
2175 async fn test_replica_capability(mut store: Store) -> Result<()> {
2176 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2177 let author = store.new_author(&mut rng)?;
2178 let namespace = NamespaceSecret::new(&mut rng);
2179
2180 let capability = Capability::Read(namespace.id());
2182 store.import_namespace(capability)?;
2183 let mut replica = store.open_replica(&namespace.id())?;
2184 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2185 assert!(matches!(res, Err(InsertError::ReadOnly)));
2186
2187 let capability = Capability::Write(namespace.clone());
2189 store.import_namespace(capability)?;
2190 let mut replica = store.open_replica(&namespace.id())?;
2191 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2192 assert!(matches!(res, Ok(_)));
2193 store.close_replica(namespace.id());
2194 let mut replica = store.open_replica(&namespace.id())?;
2195 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2196 assert!(res.is_ok());
2197
2198 let capability = Capability::Read(namespace.id());
2200 store.import_namespace(capability)?;
2201 store.close_replica(namespace.id());
2202 let mut replica = store.open_replica(&namespace.id())?;
2203 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2204 assert!(res.is_ok());
2205 store.flush()?;
2206 Ok(())
2207 }
2208
2209 #[tokio::test]
2210 async fn test_actor_capability_memory() -> Result<()> {
2211 let store = store::Store::memory();
2212 test_actor_capability(store).await
2213 }
2214
2215 #[tokio::test]
2216 #[cfg(feature = "fs-store")]
2217 async fn test_actor_capability_fs() -> Result<()> {
2218 let dbfile = tempfile::NamedTempFile::new()?;
2219 let store = store::fs::Store::persistent(dbfile.path())?;
2220 test_actor_capability(store).await
2221 }
2222
2223 async fn test_actor_capability(store: Store) -> Result<()> {
2224 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2226 let author = Author::new(&mut rng);
2227 let handle = SyncHandle::spawn(store, None, "test".into());
2228 let author = handle.import_author(author).await?;
2229 let namespace = NamespaceSecret::new(&mut rng);
2230 let id = namespace.id();
2231
2232 let capability = Capability::Read(namespace.id());
2234 handle.import_namespace(capability).await?;
2235 handle.open(namespace.id(), Default::default()).await?;
2236 let res = handle
2237 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2238 .await;
2239 assert!(res.is_err());
2240
2241 let capability = Capability::Write(namespace.clone());
2243 handle.import_namespace(capability).await?;
2244 let res = handle
2245 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2246 .await;
2247 assert!(res.is_ok());
2248
2249 handle.close(namespace.id()).await?;
2251 let res = handle
2252 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2253 .await;
2254 assert!(res.is_err());
2255 handle.open(namespace.id(), Default::default()).await?;
2256 let res = handle
2257 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2258 .await;
2259 assert!(res.is_ok());
2260 Ok(())
2261 }
2262
2263 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2264 let mut res = vec![];
2265 while let Ok(ev) = events.try_recv() {
2266 res.push(ev);
2267 }
2268 res
2269 }
2270
2271 #[tokio::test]
2274 async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2275 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2276 let mut store1 = store::Store::memory();
2277 let mut store2 = store::Store::memory();
2278 let peer1 = [1u8; 32];
2279 let peer2 = [2u8; 32];
2280 let mut state1 = SyncOutcome::default();
2281 let mut state2 = SyncOutcome::default();
2282
2283 let author = Author::new(&mut rng);
2284 let namespace = NamespaceSecret::new(&mut rng);
2285 let mut replica1 = store1.new_replica(namespace.clone())?;
2286 let mut replica2 = store2.new_replica(namespace.clone())?;
2287
2288 let (events1_sender, events1) = async_channel::bounded(32);
2289 let (events2_sender, events2) = async_channel::bounded(32);
2290
2291 replica1.info.subscribe(events1_sender);
2292 replica2.info.subscribe(events2_sender);
2293
2294 replica1.hash_and_insert(b"foo", &author, b"init").await?;
2295
2296 let from1 = replica1.sync_initial_message()?;
2297 let from2 = replica2
2298 .sync_process_message(from1, peer1, &mut state2)
2299 .await
2300 .unwrap()
2301 .unwrap();
2302 let from1 = replica1
2303 .sync_process_message(from2, peer2, &mut state1)
2304 .await
2305 .unwrap()
2306 .unwrap();
2307 replica2.hash_and_insert(b"foo", &author, b"update").await?;
2311 let from2 = replica2
2312 .sync_process_message(from1, peer1, &mut state2)
2313 .await
2314 .unwrap();
2315 assert!(from2.is_none());
2316 let events1 = drain(events1);
2317 let events2 = drain(events2);
2318 assert_eq!(events1.len(), 1);
2319 assert_eq!(events2.len(), 1);
2320 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2321 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2322 assert_eq!(state1.num_sent, 1);
2323 assert_eq!(state1.num_recv, 0);
2324 assert_eq!(state2.num_sent, 0);
2325 assert_eq!(state2.num_recv, 1);
2326 store1.flush()?;
2327 store2.flush()?;
2328 Ok(())
2329 }
2330
2331 #[tokio::test]
2332 async fn test_replica_queries_mem() -> Result<()> {
2333 let store = store::Store::memory();
2334
2335 test_replica_queries(store).await?;
2336 Ok(())
2337 }
2338
2339 #[tokio::test]
2340 #[cfg(feature = "fs-store")]
2341 async fn test_replica_queries_fs() -> Result<()> {
2342 let dbfile = tempfile::NamedTempFile::new()?;
2343 let store = store::fs::Store::persistent(dbfile.path())?;
2344 test_replica_queries(store).await?;
2345
2346 Ok(())
2347 }
2348
2349 async fn test_replica_queries(mut store: Store) -> Result<()> {
2350 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2351 let namespace = NamespaceSecret::new(&mut rng);
2352 let namespace_id = namespace.id();
2353
2354 let a1 = store.new_author(&mut rng)?;
2355 let a2 = store.new_author(&mut rng)?;
2356 let a3 = store.new_author(&mut rng)?;
2357 println!(
2358 "a1 {} a2 {} a3 {}",
2359 a1.id().fmt_short(),
2360 a2.id().fmt_short(),
2361 a3.id().fmt_short()
2362 );
2363
2364 let mut replica = store.new_replica(namespace.clone())?;
2365 replica.hash_and_insert("hi/world", &a2, "a2").await?;
2366 replica.hash_and_insert("hi/world", &a1, "a1").await?;
2367 replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2368 replica.hash_and_insert("hi", &a3, "a3").await?;
2369
2370 struct QueryTester<'a> {
2371 store: &'a mut Store,
2372 namespace: NamespaceId,
2373 }
2374 impl QueryTester<'_> {
2375 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2376 let query = query.into();
2377 let actual = self
2378 .store
2379 .get_many(self.namespace, query.clone())
2380 .unwrap()
2381 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2382 .collect::<Result<Vec<_>>>()
2383 .unwrap();
2384 let expected = expected
2385 .into_iter()
2386 .map(|(key, author)| (key.to_string(), author.id()))
2387 .collect::<Vec<_>>();
2388 assert_eq!(actual, expected, "query: {query:#?}")
2389 }
2390 }
2391
2392 let mut qt = QueryTester {
2393 store: &mut store,
2394 namespace: namespace_id,
2395 };
2396
2397 qt.assert(
2398 Query::all(),
2399 vec![
2400 ("hi/world", &a1),
2401 ("hi/moon", &a2),
2402 ("hi/world", &a2),
2403 ("hi", &a3),
2404 ],
2405 );
2406
2407 qt.assert(
2408 Query::single_latest_per_key(),
2409 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2410 );
2411
2412 qt.assert(
2413 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2414 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2415 );
2416
2417 qt.assert(
2418 Query::single_latest_per_key().key_prefix("hi/"),
2419 vec![("hi/moon", &a2), ("hi/world", &a1)],
2420 );
2421
2422 qt.assert(
2423 Query::single_latest_per_key()
2424 .key_prefix("hi/")
2425 .sort_direction(SortDirection::Desc),
2426 vec![("hi/world", &a1), ("hi/moon", &a2)],
2427 );
2428
2429 qt.assert(
2430 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2431 vec![
2432 ("hi", &a3),
2433 ("hi/moon", &a2),
2434 ("hi/world", &a1),
2435 ("hi/world", &a2),
2436 ],
2437 );
2438
2439 qt.assert(
2440 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2441 vec![
2442 ("hi/world", &a2),
2443 ("hi/world", &a1),
2444 ("hi/moon", &a2),
2445 ("hi", &a3),
2446 ],
2447 );
2448
2449 qt.assert(
2450 Query::all().key_prefix("hi/"),
2451 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2452 );
2453
2454 qt.assert(
2455 Query::all().key_prefix("hi/").offset(1).limit(1),
2456 vec![("hi/moon", &a2)],
2457 );
2458
2459 qt.assert(
2460 Query::all()
2461 .key_prefix("hi/")
2462 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2463 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2464 );
2465
2466 qt.assert(
2467 Query::all()
2468 .key_prefix("hi/")
2469 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2470 .offset(1)
2471 .limit(1),
2472 vec![("hi/world", &a1)],
2473 );
2474
2475 qt.assert(
2476 Query::all()
2477 .key_prefix("hi/")
2478 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2479 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2480 );
2481
2482 qt.assert(
2483 Query::all()
2484 .key_prefix("hi/")
2485 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2486 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2487 );
2488
2489 qt.assert(
2490 Query::all()
2491 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2492 .limit(2)
2493 .offset(1),
2494 vec![("hi/moon", &a2), ("hi/world", &a1)],
2495 );
2496
2497 let mut replica = store.new_replica(namespace)?;
2498 replica.delete_prefix("hi/world", &a2).await?;
2499 let mut qt = QueryTester {
2500 store: &mut store,
2501 namespace: namespace_id,
2502 };
2503
2504 qt.assert(
2505 Query::all(),
2506 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2507 );
2508
2509 qt.assert(
2510 Query::all().include_empty(),
2511 vec![
2512 ("hi/world", &a1),
2513 ("hi/moon", &a2),
2514 ("hi/world", &a2),
2515 ("hi", &a3),
2516 ],
2517 );
2518 store.flush()?;
2519 Ok(())
2520 }
2521
2522 #[test]
2523 fn test_dl_policies_mem() -> Result<()> {
2524 let mut store = store::Store::memory();
2525 test_dl_policies(&mut store)
2526 }
2527
2528 #[test]
2529 #[cfg(feature = "fs-store")]
2530 fn test_dl_policies_fs() -> Result<()> {
2531 let dbfile = tempfile::NamedTempFile::new()?;
2532 let mut store = store::fs::Store::persistent(dbfile.path())?;
2533 test_dl_policies(&mut store)
2534 }
2535
2536 fn test_dl_policies(store: &mut Store) -> Result<()> {
2537 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2538 let namespace = NamespaceSecret::new(&mut rng);
2539 let id = namespace.id();
2540
2541 let filter = store::FilterKind::Exact("foo".into());
2542 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2543 store
2544 .set_download_policy(&id, policy.clone())
2545 .expect_err("document dos not exist");
2546
2547 store.new_replica(namespace)?;
2549
2550 store.set_download_policy(&id, policy.clone())?;
2551 let retrieved_policy = store.get_download_policy(&id)?;
2552 assert_eq!(retrieved_policy, policy);
2553 store.flush()?;
2554 Ok(())
2555 }
2556
2557 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2558 expected.sort();
2559 assert_eq!(expected, get_keys_sorted(store, namespace));
2560 }
2561
2562 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2563 let mut res = store
2564 .get_many(namespace, Query::all())
2565 .unwrap()
2566 .map(|e| e.map(|e| e.key().to_vec()))
2567 .collect::<Result<Vec<_>>>()
2568 .unwrap();
2569 res.sort();
2570 res
2571 }
2572
2573 fn get_entry(
2574 store: &mut Store,
2575 namespace: NamespaceId,
2576 author: AuthorId,
2577 key: &[u8],
2578 ) -> anyhow::Result<SignedEntry> {
2579 let entry = store
2580 .get_exact(namespace, author, key, true)?
2581 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2582 Ok(entry)
2583 }
2584
2585 fn get_content_hash(
2586 store: &mut Store,
2587 namespace: NamespaceId,
2588 author: AuthorId,
2589 key: &[u8],
2590 ) -> anyhow::Result<Option<Hash>> {
2591 let hash = store
2592 .get_exact(namespace, author, key, false)?
2593 .map(|e| e.content_hash());
2594 Ok(hash)
2595 }
2596
2597 async fn sync<'a>(
2598 alice: &'a mut Replica<'a>,
2599 bob: &'a mut Replica<'a>,
2600 ) -> Result<(SyncOutcome, SyncOutcome)> {
2601 let alice_peer_id = [1u8; 32];
2602 let bob_peer_id = [2u8; 32];
2603 let mut alice_state = SyncOutcome::default();
2604 let mut bob_state = SyncOutcome::default();
2605 let mut next_to_bob = Some(alice.sync_initial_message()?);
2607 let mut rounds = 0;
2608 while let Some(msg) = next_to_bob.take() {
2609 assert!(rounds < 100, "too many rounds");
2610 rounds += 1;
2611 println!("round {rounds}");
2612 if let Some(msg) = bob
2613 .sync_process_message(msg, alice_peer_id, &mut bob_state)
2614 .await?
2615 {
2616 next_to_bob = alice
2617 .sync_process_message(msg, bob_peer_id, &mut alice_state)
2618 .await?
2619 }
2620 }
2621 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2622 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2623 Ok((alice_state, bob_state))
2624 }
2625
2626 fn check_entries(
2627 store: &mut Store,
2628 namespace: &NamespaceId,
2629 author: &Author,
2630 set: &[&str],
2631 ) -> Result<()> {
2632 for el in set {
2633 store.get_exact(*namespace, author.id(), el, false)?;
2634 }
2635 Ok(())
2636 }
2637
2638 #[test]
2640 fn test_signed_entry_postcard_snapshot() {
2641 let author = Author::from_bytes(&[0xa1; 32]);
2642 let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2643 let record = Record::new(Hash::EMPTY, 0, 1_700_000_000_000_000u64);
2644 let id = RecordIdentifier::new(namespace.id(), author.id(), b"wire-format-test");
2645 let signed = SignedEntry::from_entry(Entry::new(id, record), &namespace, &author);
2646
2647 let bytes = postcard::to_stdvec(&signed).unwrap();
2648 assert_eq!(
2649 hex::encode(&bytes),
2650 "4b523f1b6d9b00a4779fc9f8f105a9e36f062ceb7d511b632905782042ad30acb6dd07bfced4ecd5f3aa58321e8ace63f48f988ed8461bfdcd8b0e902187a10e228ddc6998329b7faa64875fe80da36406ea8d87e3e57bb048323e9cb66c0b343b60c4e709fb978b878e37d0c362edfc06c8cdc774c8b29d94e48eaa06cca60f5055154f42065ea5a1bea05463826be2684eb92df92c100027aabaae57ca554207bc7cbcb5636375fa1d82434d466724d92377f53b980695dd49d26d0ce12205a5776972652d666f726d61742d7465737400af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f32628080f9c0c1c48203",
2651 );
2652
2653 let decoded: SignedEntry = postcard::from_bytes(&bytes).unwrap();
2654 assert_eq!(decoded, signed);
2655 }
2656
2657 #[test]
2659 fn test_author_postcard_snapshot() {
2660 let author = Author::from_bytes(&[0xa1; 32]);
2661 let bytes = postcard::to_stdvec(&author).unwrap();
2662 assert_eq!(
2663 hex::encode(&bytes),
2664 "20a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1",
2665 );
2666 }
2667
2668 #[test]
2670 fn test_namespace_secret_postcard_snapshot() {
2671 let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2672 let bytes = postcard::to_stdvec(&namespace).unwrap();
2673 assert_eq!(
2674 hex::encode(&bytes),
2675 "20b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2",
2676 );
2677 }
2678}