1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14};
15
16use bytes::{Bytes, BytesMut};
17use iroh::{KeyParsingError, Signature, SignatureError};
23use iroh_blobs::Hash;
24use n0_future::{
25 time::{Duration, SystemTime},
26 IterExt,
27};
28use serde::{Deserialize, Serialize};
29
30pub use crate::heads::AuthorHeads;
31use crate::{
32 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
33 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
34 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
35};
36
37pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
41
42pub type PeerIdBytes = [u8; 32];
45
46pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_micros() as u64;
49
50pub type ContentStatusCallback =
52 Arc<dyn Fn(Hash) -> n0_future::boxed::BoxFuture<ContentStatus> + Send + Sync + 'static>;
53
54#[derive(derive_more::Debug, Clone)]
56pub enum Event {
57 LocalInsert {
59 namespace: NamespaceId,
61 entry: SignedEntry,
63 },
64 RemoteInsert {
66 namespace: NamespaceId,
68 entry: SignedEntry,
70 #[debug("{}", hex::encode(&from[..5]))]
73 from: PeerIdBytes,
74 should_download: bool,
76 remote_content_status: ContentStatus,
78 },
79}
80
81#[derive(derive_more::Debug, Clone)]
83pub enum InsertOrigin {
84 Local,
86 Sync {
88 #[debug("{}", hex::encode(&from[..5]))]
91 from: PeerIdBytes,
92 remote_content_status: ContentStatus,
94 },
95}
96
97#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
99pub enum ContentStatus {
100 Complete,
102 Incomplete,
104 Missing,
106}
107
108#[derive(Debug, Clone, Default)]
110pub struct SyncOutcome {
111 pub heads_received: AuthorHeads,
113 pub num_recv: usize,
115 pub num_sent: usize,
117}
118
119fn get_as_ptr<T>(value: &T) -> Option<usize> {
120 use std::mem;
121 if mem::size_of::<T>() == std::mem::size_of::<usize>()
122 && mem::align_of::<T>() == mem::align_of::<usize>()
123 {
124 unsafe { Some(mem::transmute_copy(value)) }
126 } else {
127 None
128 }
129}
130
131fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
132 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
133}
134
135#[derive(Debug, Default)]
136struct Subscribers(Vec<async_channel::Sender<Event>>);
137impl Subscribers {
138 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
139 self.0.push(sender)
140 }
141 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
142 self.0.retain(|s| !same_channel(s, sender));
143 }
144 pub async fn send(&mut self, event: Event) {
145 self.0 = std::mem::take(&mut self.0)
146 .into_iter()
147 .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx))
148 .join_all()
149 .await
150 .into_iter()
151 .flatten()
152 .collect();
153 }
154 pub fn len(&self) -> usize {
155 self.0.len()
156 }
157 pub async fn send_with(&mut self, f: impl FnOnce() -> Event) {
158 if !self.0.is_empty() {
159 self.send(f()).await
160 }
161 }
162}
163
164#[derive(
166 Debug,
167 Clone,
168 Copy,
169 Serialize,
170 Deserialize,
171 num_enum::IntoPrimitive,
172 num_enum::TryFromPrimitive,
173 strum::Display,
174)]
175#[repr(u8)]
176#[strum(serialize_all = "snake_case")]
177pub enum CapabilityKind {
178 Write = 1,
180 Read = 2,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
186pub enum Capability {
187 Write(NamespaceSecret),
189 Read(NamespaceId),
191}
192
193impl Capability {
194 pub fn id(&self) -> NamespaceId {
196 match self {
197 Capability::Write(secret) => secret.id(),
198 Capability::Read(id) => *id,
199 }
200 }
201
202 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
205 match self {
206 Capability::Write(secret) => Ok(secret),
207 Capability::Read(_) => Err(ReadOnly),
208 }
209 }
210
211 pub fn kind(&self) -> CapabilityKind {
213 match self {
214 Capability::Write(_) => CapabilityKind::Write,
215 Capability::Read(_) => CapabilityKind::Read,
216 }
217 }
218
219 pub fn raw(&self) -> (u8, [u8; 32]) {
221 let capability_repr: u8 = self.kind().into();
222 let bytes = match self {
223 Capability::Write(secret) => secret.to_bytes(),
224 Capability::Read(id) => id.to_bytes(),
225 };
226 (capability_repr, bytes)
227 }
228
229 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
231 let kind: CapabilityKind = kind.try_into()?;
232 let capability = match kind {
233 CapabilityKind::Write => {
234 let secret = NamespaceSecret::from_bytes(bytes);
235 Capability::Write(secret)
236 }
237 CapabilityKind::Read => {
238 let id = NamespaceId::from(bytes);
239 Capability::Read(id)
240 }
241 };
242 Ok(capability)
243 }
244
245 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
251 if other.id() != self.id() {
252 return Err(CapabilityError::NamespaceMismatch);
253 }
254
255 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
257 let _ = std::mem::replace(self, other);
258 Ok(true)
259 } else {
260 Ok(false)
261 }
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum CapabilityError {
268 #[error("Namespaces are not the same")]
270 NamespaceMismatch,
271}
272
273#[derive(derive_more::Debug)]
275pub struct ReplicaInfo {
276 pub(crate) capability: Capability,
277 subscribers: Subscribers,
278 #[debug("ContentStatusCallback")]
279 content_status_cb: Option<ContentStatusCallback>,
280 closed: bool,
281}
282
283impl ReplicaInfo {
284 pub fn new(capability: Capability) -> Self {
286 Self {
287 capability,
288 subscribers: Default::default(),
289 content_status_cb: None,
291 closed: false,
292 }
293 }
294
295 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
301 self.subscribers.subscribe(sender)
302 }
303
304 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
310 self.subscribers.unsubscribe(sender)
311 }
312
313 pub fn subscribers_count(&self) -> usize {
315 self.subscribers.len()
316 }
317
318 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
323 if self.content_status_cb.is_some() {
324 false
325 } else {
326 self.content_status_cb = Some(cb);
327 true
328 }
329 }
330
331 fn ensure_open(&self) -> Result<(), InsertError> {
332 if self.closed() {
333 Err(InsertError::Closed)
334 } else {
335 Ok(())
336 }
337 }
338
339 pub fn closed(&self) -> bool {
345 self.closed
346 }
347
348 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
355 self.capability.merge(capability)
356 }
357}
358
359#[derive(derive_more::Debug)]
361pub struct Replica<'a, I = Box<ReplicaInfo>> {
362 pub(crate) store: StoreInstance<'a>,
363 pub(crate) info: I,
364}
365
366impl<'a, I> Replica<'a, I>
367where
368 I: Deref<Target = ReplicaInfo> + DerefMut,
369{
370 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
372 Replica { info, store }
373 }
374
375 pub async fn insert(
383 &mut self,
384 key: impl AsRef<[u8]>,
385 author: &Author,
386 hash: Hash,
387 len: u64,
388 ) -> Result<usize, InsertError> {
389 if len == 0 || hash == Hash::EMPTY {
390 return Err(InsertError::EntryIsEmpty);
391 }
392 self.info.ensure_open()?;
393 let id = RecordIdentifier::new(self.id(), author.id(), key);
394 let record = Record::new_current(hash, len);
395 let entry = Entry::new(id, record);
396 let secret = self.secret_key()?;
397 let signed_entry = entry.sign(secret, author);
398 self.insert_entry(signed_entry, InsertOrigin::Local).await
399 }
400
401 pub async fn delete_prefix(
408 &mut self,
409 prefix: impl AsRef<[u8]>,
410 author: &Author,
411 ) -> Result<usize, InsertError> {
412 self.info.ensure_open()?;
413 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
414 let entry = Entry::new_empty(id);
415 let signed_entry = entry.sign(self.secret_key()?, author);
416 self.insert_entry(signed_entry, InsertOrigin::Local).await
417 }
418
419 pub async fn insert_remote_entry(
427 &mut self,
428 entry: SignedEntry,
429 received_from: PeerIdBytes,
430 content_status: ContentStatus,
431 ) -> Result<usize, InsertError> {
432 self.info.ensure_open()?;
433 entry.validate_empty()?;
434 let origin = InsertOrigin::Sync {
435 from: received_from,
436 remote_content_status: content_status,
437 };
438 self.insert_entry(entry, origin).await
439 }
440
441 async fn insert_entry(
445 &mut self,
446 entry: SignedEntry,
447 origin: InsertOrigin,
448 ) -> Result<usize, InsertError> {
449 let namespace = self.id();
450
451 let store = &self.store;
452 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
453
454 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
455 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
456
457 let removed_count = match outcome {
458 InsertOutcome::Inserted { removed } => removed,
459 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
460 };
461
462 let insert_event = match origin {
463 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
464 InsertOrigin::Sync {
465 from,
466 remote_content_status,
467 } => {
468 let download_policy = self
469 .store
470 .get_download_policy(&self.id())
471 .unwrap_or_default();
472 let should_download = download_policy.matches(entry.entry());
473 Event::RemoteInsert {
474 namespace,
475 entry,
476 from,
477 should_download,
478 remote_content_status,
479 }
480 }
481 };
482
483 self.info.subscribers.send(insert_event).await;
484
485 Ok(removed_count)
486 }
487
488 pub async fn hash_and_insert(
493 &mut self,
494 key: impl AsRef<[u8]>,
495 author: &Author,
496 data: impl AsRef<[u8]>,
497 ) -> Result<Hash, InsertError> {
498 self.info.ensure_open()?;
499 let len = data.as_ref().len() as u64;
500 let hash = Hash::new(data);
501 self.insert(key, author, hash, len).await?;
502 Ok(hash)
503 }
504
505 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
507 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
508 }
509
510 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
512 self.info.ensure_open().map_err(anyhow::Error::from)?;
513 self.store.initial_message()
514 }
515
516 pub async fn sync_process_message(
520 &mut self,
521 message: crate::ranger::Message<SignedEntry>,
522 from_peer: PeerIdBytes,
523 state: &mut SyncOutcome,
524 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
525 self.info.ensure_open()?;
526 let my_namespace = self.id();
527 let now = system_time_now();
528
529 state.num_recv += message.value_count();
531 for (entry, _content_status) in message.values() {
532 state
533 .heads_received
534 .insert(entry.author(), entry.timestamp());
535 }
536
537 let cb = self.info.content_status_cb.clone();
540 let download_policy = self
541 .store
542 .get_download_policy(&my_namespace)
543 .unwrap_or_default();
544 let reply = self
545 .store
546 .process_message(
547 &Default::default(),
548 message,
549 |store, entry, content_status| {
551 let origin = InsertOrigin::Sync {
552 from: from_peer,
553 remote_content_status: content_status,
554 };
555 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
556 },
557 async |_store, entry, content_status| {
559 self.info
561 .subscribers
562 .send_with(|| {
563 let should_download = download_policy.matches(entry.entry());
564 Event::RemoteInsert {
565 from: from_peer,
566 namespace: my_namespace,
567 entry: entry.clone(),
568 should_download,
569 remote_content_status: content_status,
570 }
571 })
572 .await
573 },
574 async move |entry| {
576 if let Some(cb) = cb.as_ref() {
577 cb(entry.content_hash()).await
578 } else {
579 ContentStatus::Missing
580 }
581 },
582 )
583 .await?;
584
585 if let Some(ref reply) = reply {
587 state.num_sent += reply.value_count();
588 }
589
590 Ok(reply)
591 }
592
593 pub fn id(&self) -> NamespaceId {
595 self.info.capability.id()
596 }
597
598 pub fn capability(&self) -> &Capability {
600 &self.info.capability
601 }
602
603 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
606 self.info.capability.secret_key()
607 }
608}
609
610#[derive(Debug, thiserror::Error)]
612#[error("Replica allows read access only.")]
613pub struct ReadOnly;
614
615fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
623 now: u64,
624 store: &S,
625 expected_namespace: NamespaceId,
626 entry: &SignedEntry,
627 origin: &InsertOrigin,
628) -> Result<(), ValidationFailure> {
629 if entry.namespace() != expected_namespace {
631 return Err(ValidationFailure::InvalidNamespace);
632 }
633
634 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
636 return Err(ValidationFailure::BadSignature);
637 }
638
639 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
641 return Err(ValidationFailure::TooFarInTheFuture);
642 }
643 Ok(())
644}
645
646#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
648pub enum InsertError {
649 #[error("storage error")]
651 Store(anyhow::Error),
652 #[error("validation failure")]
654 Validation(#[from] ValidationFailure),
655 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
657 NewerEntryExists,
658 #[error("Attempted to insert an empty entry")]
660 EntryIsEmpty,
661 #[error("Attempted to insert to read only replica")]
663 #[from(ReadOnly)]
664 ReadOnly,
665 #[error("replica is closed")]
667 Closed,
668}
669
670#[derive(thiserror::Error, Debug)]
672pub enum SignedEntryVerifyError {
673 #[error(transparent)]
675 KeyParsing(#[from] KeyParsingError),
676 #[error(transparent)]
678 Signature(#[from] SignatureError),
679}
680
681#[derive(thiserror::Error, Debug)]
683pub enum ValidationFailure {
684 #[error("Entry namespace does not match the current replica")]
686 InvalidNamespace,
687 #[error("Entry signature is invalid")]
689 BadSignature,
690 #[error("Entry timestamp is too far in the future.")]
692 TooFarInTheFuture,
693 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
695 InvalidEmptyEntry,
696}
697
698#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
700pub struct SignedEntry {
701 signature: EntrySignature,
702 entry: Entry,
703}
704
705impl From<SignedEntry> for Entry {
706 fn from(value: SignedEntry) -> Self {
707 value.entry
708 }
709}
710
711impl PartialOrd for SignedEntry {
712 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
713 Some(self.cmp(other))
714 }
715}
716
717impl Ord for SignedEntry {
718 fn cmp(&self, other: &Self) -> Ordering {
719 self.entry.cmp(&other.entry)
720 }
721}
722
723impl PartialOrd for Entry {
724 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
725 Some(self.cmp(other))
726 }
727}
728
729impl Ord for Entry {
730 fn cmp(&self, other: &Self) -> Ordering {
731 self.id
732 .cmp(&other.id)
733 .then_with(|| self.record.cmp(&other.record))
734 }
735}
736
737impl SignedEntry {
738 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
739 SignedEntry { signature, entry }
740 }
741
742 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
744 let signature = EntrySignature::from_entry(&entry, namespace, author);
745 SignedEntry { signature, entry }
746 }
747
748 pub fn from_parts(
750 namespace: &NamespaceSecret,
751 author: &Author,
752 key: impl AsRef<[u8]>,
753 record: Record,
754 ) -> Self {
755 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
756 let entry = Entry::new(id, record);
757 Self::from_entry(entry, namespace, author)
758 }
759
760 pub fn verify<S: store::PublicKeyStore>(
762 &self,
763 store: &S,
764 ) -> Result<(), SignedEntryVerifyError> {
765 self.signature.verify(
766 &self.entry,
767 &self.entry.namespace().public_key(store)?,
768 &self.entry.author().public_key(store)?,
769 )?;
770 Ok(())
771 }
772
773 pub fn signature(&self) -> &EntrySignature {
775 &self.signature
776 }
777
778 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
780 self.entry().validate_empty()
781 }
782
783 pub fn entry(&self) -> &Entry {
785 &self.entry
786 }
787
788 pub fn content_hash(&self) -> Hash {
790 self.entry().content_hash()
791 }
792
793 pub fn content_len(&self) -> u64 {
795 self.entry().content_len()
796 }
797
798 pub fn author_bytes(&self) -> AuthorId {
800 self.entry().id().author()
801 }
802
803 pub fn key(&self) -> &[u8] {
805 self.entry().id().key()
806 }
807
808 pub fn timestamp(&self) -> u64 {
810 self.entry().timestamp()
811 }
812}
813
814impl RangeEntry for SignedEntry {
815 type Key = RecordIdentifier;
816 type Value = Record;
817
818 fn key(&self) -> &Self::Key {
819 &self.entry.id
820 }
821
822 fn value(&self) -> &Self::Value {
823 &self.entry.record
824 }
825
826 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
827 let mut hasher = blake3::Hasher::new();
828 hasher.update(self.namespace().as_ref());
829 hasher.update(self.author_bytes().as_ref());
830 hasher.update(self.key());
831 hasher.update(&self.timestamp().to_be_bytes());
832 hasher.update(self.content_hash().as_bytes());
833 Fingerprint(hasher.finalize().into())
834 }
835}
836
837#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
839pub struct EntrySignature {
840 author_signature: Signature,
841 namespace_signature: Signature,
842}
843
844impl Debug for EntrySignature {
845 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
846 f.debug_struct("EntrySignature")
847 .field(
848 "namespace_signature",
849 &hex::encode(self.namespace_signature.to_bytes()),
850 )
851 .field(
852 "author_signature",
853 &hex::encode(self.author_signature.to_bytes()),
854 )
855 .finish()
856 }
857}
858
859impl EntrySignature {
860 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
862 let bytes = entry.to_vec();
865 let namespace_signature = Signature::from_bytes(&namespace.sign(&bytes).to_bytes());
866 let author_signature = Signature::from_bytes(&author.sign(&bytes).to_bytes());
867
868 EntrySignature {
869 author_signature,
870 namespace_signature,
871 }
872 }
873
874 pub fn verify(
877 &self,
878 entry: &Entry,
879 namespace: &NamespacePublicKey,
880 author: &AuthorPublicKey,
881 ) -> Result<(), SignatureError> {
882 let bytes = entry.to_vec();
883 namespace.verify(&bytes, &self.namespace_signature)?;
884 author.verify(&bytes, &self.author_signature)?;
885
886 Ok(())
887 }
888
889 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
890 let namespace_signature = Signature::from_bytes(namespace_sig);
891 let author_signature = Signature::from_bytes(author_sig);
892
893 EntrySignature {
894 author_signature,
895 namespace_signature,
896 }
897 }
898
899 pub(crate) fn author(&self) -> &Signature {
900 &self.author_signature
901 }
902
903 pub(crate) fn namespace(&self) -> &Signature {
904 &self.namespace_signature
905 }
906}
907
908#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
914pub struct Entry {
915 id: RecordIdentifier,
916 record: Record,
917}
918
919impl Entry {
920 pub fn new(id: RecordIdentifier, record: Record) -> Self {
922 Entry { id, record }
923 }
924
925 pub fn new_empty(id: RecordIdentifier) -> Self {
927 Entry {
928 id,
929 record: Record::empty_current(),
930 }
931 }
932
933 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
935 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
936 (true, true) => Ok(()),
937 (false, false) => Ok(()),
938 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
939 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
940 }
941 }
942
943 pub fn id(&self) -> &RecordIdentifier {
945 &self.id
946 }
947
948 pub fn namespace(&self) -> NamespaceId {
950 self.id.namespace()
951 }
952
953 pub fn author(&self) -> AuthorId {
955 self.id.author()
956 }
957
958 pub fn key(&self) -> &[u8] {
960 self.id.key()
961 }
962
963 pub fn record(&self) -> &Record {
965 &self.record
966 }
967
968 pub fn content_hash(&self) -> Hash {
970 self.record.hash
971 }
972
973 pub fn content_len(&self) -> u64 {
975 self.record.len
976 }
977
978 pub fn timestamp(&self) -> u64 {
980 self.record.timestamp
981 }
982
983 pub fn encode(&self, out: &mut Vec<u8>) {
985 self.id.encode(out);
986 self.record.encode(out);
987 }
988
989 pub fn to_vec(&self) -> Vec<u8> {
991 let mut out = Vec::new();
992 self.encode(&mut out);
993 out
994 }
995
996 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
998 SignedEntry::from_entry(self, namespace, author)
999 }
1000}
1001
1002const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
1003const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
1004const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
1005
1006#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
1008pub struct RecordIdentifier(Bytes);
1009
1010impl Default for RecordIdentifier {
1011 fn default() -> Self {
1012 Self::new(NamespaceId::default(), AuthorId::default(), b"")
1013 }
1014}
1015
1016impl Debug for RecordIdentifier {
1017 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1018 f.debug_struct("RecordIdentifier")
1019 .field("namespace", &self.namespace())
1020 .field("author", &self.author())
1021 .field("key", &std::string::String::from_utf8_lossy(self.key()))
1022 .finish()
1023 }
1024}
1025
1026impl RangeKey for RecordIdentifier {
1027 #[cfg(test)]
1028 fn is_prefix_of(&self, other: &Self) -> bool {
1029 other.as_ref().starts_with(self.as_ref())
1030 }
1031}
1032
1033fn system_time_now() -> u64 {
1034 SystemTime::now()
1035 .duration_since(SystemTime::UNIX_EPOCH)
1036 .expect("time drift")
1037 .as_micros() as u64
1038}
1039
1040impl RecordIdentifier {
1041 pub fn new(
1043 namespace: impl Into<NamespaceId>,
1044 author: impl Into<AuthorId>,
1045 key: impl AsRef<[u8]>,
1046 ) -> Self {
1047 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1048 bytes.extend_from_slice(namespace.into().as_bytes());
1049 bytes.extend_from_slice(author.into().as_bytes());
1050 bytes.extend_from_slice(key.as_ref());
1051 Self(bytes.freeze())
1052 }
1053
1054 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1056 out.extend_from_slice(&self.0);
1057 }
1058
1059 pub fn as_bytes(&self) -> Bytes {
1061 self.0.clone()
1062 }
1063
1064 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1066 (
1067 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1068 self.0[AUTHOR_BYTES].try_into().unwrap(),
1069 &self.0[KEY_BYTES],
1070 )
1071 }
1072
1073 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1075 (
1076 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1077 self.0[AUTHOR_BYTES].try_into().unwrap(),
1078 self.0.slice(KEY_BYTES),
1079 )
1080 }
1081
1082 pub fn key(&self) -> &[u8] {
1084 &self.0[KEY_BYTES]
1085 }
1086
1087 pub fn key_bytes(&self) -> Bytes {
1089 self.0.slice(KEY_BYTES)
1090 }
1091
1092 pub fn namespace(&self) -> NamespaceId {
1094 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1095 value.into()
1096 }
1097
1098 pub fn author(&self) -> AuthorId {
1100 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1101 value.into()
1102 }
1103}
1104
1105impl AsRef<[u8]> for RecordIdentifier {
1106 fn as_ref(&self) -> &[u8] {
1107 &self.0
1108 }
1109}
1110
1111impl Deref for SignedEntry {
1112 type Target = Entry;
1113 fn deref(&self) -> &Self::Target {
1114 &self.entry
1115 }
1116}
1117
1118impl Deref for Entry {
1119 type Target = Record;
1120 fn deref(&self) -> &Self::Target {
1121 &self.record
1122 }
1123}
1124
1125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1127pub struct Record {
1128 len: u64,
1130 hash: Hash,
1132 timestamp: u64,
1134}
1135
1136impl RangeValue for Record {}
1137
1138impl Ord for Record {
1142 fn cmp(&self, other: &Self) -> Ordering {
1143 self.timestamp
1144 .cmp(&other.timestamp)
1145 .then_with(|| self.hash.cmp(&other.hash))
1146 }
1147}
1148
1149impl PartialOrd for Record {
1150 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1151 Some(self.cmp(other))
1152 }
1153}
1154
1155impl Record {
1156 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1158 debug_assert!(
1159 len != 0 || hash == Hash::EMPTY,
1160 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1161 );
1162 Record {
1163 hash,
1164 len,
1165 timestamp,
1166 }
1167 }
1168
1169 pub fn empty(timestamp: u64) -> Self {
1171 Self::new(Hash::EMPTY, 0, timestamp)
1172 }
1173
1174 pub fn empty_current() -> Self {
1176 Self::new_current(Hash::EMPTY, 0)
1177 }
1178
1179 pub fn is_empty(&self) -> bool {
1181 self.hash == Hash::EMPTY
1182 }
1183
1184 pub fn new_current(hash: Hash, len: u64) -> Self {
1186 let timestamp = system_time_now();
1187 Self::new(hash, len, timestamp)
1188 }
1189
1190 pub fn content_len(&self) -> u64 {
1192 self.len
1193 }
1194
1195 pub fn content_hash(&self) -> Hash {
1197 self.hash
1198 }
1199
1200 pub fn timestamp(&self) -> u64 {
1202 self.timestamp
1203 }
1204
1205 #[cfg(test)]
1206 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1207 let len = data.as_ref().len() as u64;
1208 let hash = Hash::new(data);
1209 Self::new_current(hash, len)
1210 }
1211
1212 #[cfg(test)]
1213 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1214 let len = data.as_ref().len() as u64;
1215 let hash = Hash::new(data);
1216 Self::new(hash, len, timestamp)
1217 }
1218
1219 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1221 out.extend_from_slice(&self.len.to_be_bytes());
1222 out.extend_from_slice(self.hash.as_ref());
1223 out.extend_from_slice(&self.timestamp.to_be_bytes())
1224 }
1225}
1226
1227#[cfg(test)]
1228mod tests {
1229 use std::collections::HashSet;
1230
1231 use anyhow::Result;
1232 use rand::SeedableRng;
1233
1234 use super::*;
1235 use crate::{
1236 actor::SyncHandle,
1237 ranger::{Range, Store as _},
1238 store::{OpenError, Query, SortBy, SortDirection, Store},
1239 };
1240
1241 #[tokio::test]
1242 async fn test_basics_memory() -> Result<()> {
1243 let store = store::Store::memory();
1244 test_basics(store).await?;
1245
1246 Ok(())
1247 }
1248
1249 #[tokio::test]
1250 #[cfg(feature = "fs-store")]
1251 async fn test_basics_fs() -> Result<()> {
1252 let dbfile = tempfile::NamedTempFile::new()?;
1253 let store = store::fs::Store::persistent(dbfile.path())?;
1254 test_basics(store).await?;
1255 Ok(())
1256 }
1257
1258 async fn test_basics(mut store: Store) -> Result<()> {
1259 let mut rng = rand::rng();
1260 let alice = Author::new(&mut rng);
1261 let bob = Author::new(&mut rng);
1262 let myspace = NamespaceSecret::new(&mut rng);
1263
1264 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1265 let record = Record::current_from_data(b"this is my cool data");
1266 let entry = Entry::new(record_id, record);
1267 let signed_entry = entry.sign(&myspace, &alice);
1268 signed_entry.verify(&()).expect("failed to verify");
1269
1270 let mut my_replica = store.new_replica(myspace.clone())?;
1271 for i in 0..10 {
1272 my_replica
1273 .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice"))
1274 .await?;
1275 }
1276
1277 for i in 0..10 {
1278 let res = store
1279 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1280 .unwrap();
1281 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1282 assert_eq!(res.entry().record().content_len(), len);
1283 res.verify(&())?;
1284 }
1285
1286 let mut my_replica = store.new_replica(myspace.clone())?;
1288 my_replica
1289 .hash_and_insert("/cool/path", &alice, "round 1")
1290 .await?;
1291 let _entry = store
1292 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1293 .unwrap();
1294 let mut my_replica = store.new_replica(myspace.clone())?;
1296 my_replica
1297 .hash_and_insert("/cool/path", &alice, "round 2")
1298 .await?;
1299 let _entry = store
1300 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1301 .unwrap();
1302
1303 let entries: Vec<_> = store
1305 .get_many(myspace.id(), Query::author(alice.id()))?
1306 .collect::<Result<_>>()?;
1307 assert_eq!(entries.len(), 11);
1308
1309 let entries: Vec<_> = store
1311 .get_many(myspace.id(), Query::author(bob.id()))?
1312 .collect::<Result<_>>()?;
1313 assert_eq!(entries.len(), 0);
1314
1315 let entries: Vec<_> = store
1317 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1318 .collect::<Result<_>>()?;
1319 assert_eq!(entries.len(), 1);
1320
1321 let entries: Vec<_> = store
1323 .get_many(myspace.id(), Query::all())?
1324 .collect::<Result<_>>()?;
1325 assert_eq!(entries.len(), 11);
1326
1327 let mut my_replica = store.new_replica(myspace.clone())?;
1329 let _entry = my_replica
1330 .hash_and_insert("/cool/path", &bob, "bob round 1")
1331 .await?;
1332
1333 let entries: Vec<_> = store
1335 .get_many(myspace.id(), Query::author(alice.id()))?
1336 .collect::<Result<_>>()?;
1337 assert_eq!(entries.len(), 11);
1338
1339 let entries: Vec<_> = store
1340 .get_many(myspace.id(), Query::author(bob.id()))?
1341 .collect::<Result<_>>()?;
1342 assert_eq!(entries.len(), 1);
1343
1344 let entries: Vec<_> = store
1346 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1347 .collect::<Result<_>>()?;
1348 assert_eq!(entries.len(), 2);
1349
1350 let entries: Vec<_> = store
1352 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1353 .collect::<Result<_>>()?;
1354 assert_eq!(entries.len(), 2);
1355
1356 let entries: Vec<_> = store
1358 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1359 .collect::<Result<_>>()?;
1360 assert_eq!(entries.len(), 1);
1361
1362 let entries: Vec<_> = store
1363 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1364 .collect::<Result<_>>()?;
1365 assert_eq!(entries.len(), 1);
1366
1367 let entries: Vec<_> = store
1369 .get_many(myspace.id(), Query::all())?
1370 .collect::<Result<_>>()?;
1371 assert_eq!(entries.len(), 12);
1372
1373 let mut my_replica = store.new_replica(myspace.clone())?;
1375 let entries_second: Vec<_> = my_replica
1376 .store
1377 .get_range(Range::new(
1378 RecordIdentifier::default(),
1379 RecordIdentifier::default(),
1380 ))?
1381 .collect::<Result<_, _>>()?;
1382
1383 assert_eq!(entries_second.len(), 12);
1384 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1385
1386 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1387 store.flush()?;
1388 Ok(())
1389 }
1390
1391 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1394 #[track_caller]
1396 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1397 assert_eq!(
1398 expected_peers,
1399 &store
1400 .get_sync_peers(&namespace)
1401 .unwrap()
1402 .unwrap()
1403 .collect::<Vec<_>>(),
1404 "sync peers differ"
1405 );
1406 }
1407
1408 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1409 let mut expected_peers = Vec::with_capacity(count);
1411 for i in 0..count as u8 {
1412 let peer = [i; 32];
1413 expected_peers.insert(0, peer);
1414 store.register_useful_peer(namespace, peer)?;
1415 }
1416 verify_peers(store, namespace, &expected_peers);
1417
1418 expected_peers.pop();
1420 let newer_peer = [count as u8; 32];
1421 expected_peers.insert(0, newer_peer);
1422 store.register_useful_peer(namespace, newer_peer)?;
1423 verify_peers(store, namespace, &expected_peers);
1424
1425 let refreshed_peer = expected_peers.remove(2);
1427 expected_peers.insert(0, refreshed_peer);
1428 store.register_useful_peer(namespace, refreshed_peer)?;
1429 verify_peers(store, namespace, &expected_peers);
1430 Ok(())
1431 }
1432
1433 #[tokio::test]
1434 async fn test_content_hashes_iterator_memory() -> Result<()> {
1435 let store = store::Store::memory();
1436 test_content_hashes_iterator(store).await
1437 }
1438
1439 #[tokio::test]
1440 #[cfg(feature = "fs-store")]
1441 async fn test_content_hashes_iterator_fs() -> Result<()> {
1442 let dbfile = tempfile::NamedTempFile::new()?;
1443 let store = store::fs::Store::persistent(dbfile.path())?;
1444 test_content_hashes_iterator(store).await
1445 }
1446
1447 async fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1448 let mut rng = rand::rng();
1449 let mut expected = HashSet::new();
1450 let n_replicas = 3;
1451 let n_entries = 4;
1452 for i in 0..n_replicas {
1453 let namespace = NamespaceSecret::new(&mut rng);
1454 let author = store.new_author(&mut rng)?;
1455 let mut replica = store.new_replica(namespace)?;
1456 for j in 0..n_entries {
1457 let key = format!("{j}");
1458 let data = format!("{i}:{j}");
1459 let hash = replica.hash_and_insert(key, &author, data).await?;
1460 expected.insert(hash);
1461 }
1462 }
1463 assert_eq!(expected.len(), n_replicas * n_entries);
1464 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1465 assert_eq!(actual, expected);
1466 Ok(())
1467 }
1468
1469 #[test]
1470 fn test_multikey() {
1471 let mut rng = rand::rng();
1472
1473 let k = ["a", "c", "z"];
1474
1475 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1476 n.sort_by_key(|n| n.id());
1477
1478 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1479 a.sort_by_key(|a| a.id());
1480
1481 {
1483 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1484 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1485 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1486
1487 let range = Range::new(ri0.clone(), ri2.clone());
1488 assert!(range.contains(&ri0), "start");
1489 assert!(range.contains(&ri1), "inside");
1490 assert!(!range.contains(&ri2), "end");
1491
1492 assert!(ri0 < ri1);
1493 assert!(ri1 < ri2);
1494 }
1495
1496 {
1498 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1499 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1500 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1501
1502 let range = Range::new(ri0.clone(), ri2.clone());
1503 assert!(range.contains(&ri0), "start");
1504 assert!(range.contains(&ri1), "inside");
1505 assert!(!range.contains(&ri2), "end");
1506
1507 assert!(ri0 < ri1);
1508 assert!(ri1 < ri2);
1509 }
1510
1511 {
1513 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1514 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1515 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1516
1517 let range = Range::new(ri0.clone(), ri2.clone());
1518 assert!(range.contains(&ri0), "start");
1519 assert!(range.contains(&ri1), "inside");
1520 assert!(!range.contains(&ri2), "end");
1521
1522 assert!(ri0 < ri1);
1523 assert!(ri1 < ri2);
1524 }
1525
1526 {
1528 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1529 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1530 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1531
1532 let range = Range::new(ri0.clone(), ri2.clone());
1533 assert!(range.contains(&ri0), "start");
1534 assert!(range.contains(&ri1), "inside");
1535 assert!(!range.contains(&ri2), "end");
1536
1537 assert!(ri0 < ri1);
1538 assert!(ri1 < ri2);
1539 }
1540
1541 {
1543 let a0 = a[0].id();
1546 let a1 = a[1].id();
1547 let n0 = n[0].id();
1548 let n1 = n[1].id();
1549 let k0 = k[0];
1550 let k1 = k[1];
1551
1552 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1553 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1554 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1555 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1556 }
1557 }
1558
1559 #[tokio::test]
1560 async fn test_timestamps_memory() -> Result<()> {
1561 let store = store::Store::memory();
1562 test_timestamps(store).await?;
1563
1564 Ok(())
1565 }
1566
1567 #[tokio::test]
1568 #[cfg(feature = "fs-store")]
1569 async fn test_timestamps_fs() -> Result<()> {
1570 let dbfile = tempfile::NamedTempFile::new()?;
1571 let store = store::fs::Store::persistent(dbfile.path())?;
1572 test_timestamps(store).await?;
1573 Ok(())
1574 }
1575
1576 async fn test_timestamps(mut store: Store) -> Result<()> {
1577 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
1578 let namespace = NamespaceSecret::new(&mut rng);
1579 let _replica = store.new_replica(namespace.clone())?;
1580 let author = store.new_author(&mut rng)?;
1581 store.close_replica(namespace.id());
1582 let mut replica = store.open_replica(&namespace.id())?;
1583
1584 let key = b"hello";
1585 let value = b"world";
1586 let entry = {
1587 let timestamp = 2;
1588 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1589 let record = Record::from_data(value, timestamp);
1590 Entry::new(id, record).sign(&namespace, &author)
1591 };
1592
1593 replica
1594 .insert_entry(entry.clone(), InsertOrigin::Local)
1595 .await
1596 .unwrap();
1597 store.close_replica(namespace.id());
1598 let res = store
1599 .get_exact(namespace.id(), author.id(), key, false)?
1600 .unwrap();
1601 assert_eq!(res, entry);
1602
1603 let entry2 = {
1604 let timestamp = 1;
1605 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1606 let record = Record::from_data(value, timestamp);
1607 Entry::new(id, record).sign(&namespace, &author)
1608 };
1609
1610 let mut replica = store.open_replica(&namespace.id())?;
1611 let res = replica.insert_entry(entry2, InsertOrigin::Local).await;
1612 store.close_replica(namespace.id());
1613 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1614 let res = store
1615 .get_exact(namespace.id(), author.id(), key, false)?
1616 .unwrap();
1617 assert_eq!(res, entry);
1618 store.flush()?;
1619 Ok(())
1620 }
1621
1622 #[tokio::test]
1623 async fn test_replica_sync_memory() -> Result<()> {
1624 let alice_store = store::Store::memory();
1625 let bob_store = store::Store::memory();
1626
1627 test_replica_sync(alice_store, bob_store).await?;
1628 Ok(())
1629 }
1630
1631 #[tokio::test]
1632 #[cfg(feature = "fs-store")]
1633 async fn test_replica_sync_fs() -> Result<()> {
1634 let alice_dbfile = tempfile::NamedTempFile::new()?;
1635 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1636 let bob_dbfile = tempfile::NamedTempFile::new()?;
1637 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1638 test_replica_sync(alice_store, bob_store).await?;
1639
1640 Ok(())
1641 }
1642
1643 async fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1644 let alice_set = ["ape", "eel", "fox", "gnu"];
1645 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1646
1647 let mut rng = rand::rng();
1648 let author = Author::new(&mut rng);
1649 let myspace = NamespaceSecret::new(&mut rng);
1650 let mut alice = alice_store.new_replica(myspace.clone())?;
1651 for el in &alice_set {
1652 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1653 }
1654
1655 let mut bob = bob_store.new_replica(myspace.clone())?;
1656 for el in &bob_set {
1657 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1658 }
1659
1660 let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?;
1661
1662 assert_eq!(alice_out.num_sent, 2);
1663 assert_eq!(bob_out.num_recv, 2);
1664 assert_eq!(alice_out.num_recv, 6);
1665 assert_eq!(bob_out.num_sent, 6);
1666
1667 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1668 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1669 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1670 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1671 alice_store.flush()?;
1672 bob_store.flush()?;
1673 Ok(())
1674 }
1675
1676 #[tokio::test]
1677 async fn test_replica_timestamp_sync_memory() -> Result<()> {
1678 let alice_store = store::Store::memory();
1679 let bob_store = store::Store::memory();
1680
1681 test_replica_timestamp_sync(alice_store, bob_store).await?;
1682 Ok(())
1683 }
1684
1685 #[tokio::test]
1686 #[cfg(feature = "fs-store")]
1687 async fn test_replica_timestamp_sync_fs() -> Result<()> {
1688 let alice_dbfile = tempfile::NamedTempFile::new()?;
1689 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1690 let bob_dbfile = tempfile::NamedTempFile::new()?;
1691 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1692 test_replica_timestamp_sync(alice_store, bob_store).await?;
1693
1694 Ok(())
1695 }
1696
1697 async fn test_replica_timestamp_sync(
1698 mut alice_store: Store,
1699 mut bob_store: Store,
1700 ) -> Result<()> {
1701 let mut rng = rand::rng();
1702 let author = Author::new(&mut rng);
1703 let namespace = NamespaceSecret::new(&mut rng);
1704 let mut alice = alice_store.new_replica(namespace.clone())?;
1705 let mut bob = bob_store.new_replica(namespace.clone())?;
1706
1707 let key = b"key";
1708 let alice_value = b"alice";
1709 let bob_value = b"bob";
1710 let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?;
1711 let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?;
1713 sync(&mut alice, &mut bob).await?;
1714 assert_eq!(
1715 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1716 Some(bob_hash)
1717 );
1718 assert_eq!(
1719 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1720 Some(bob_hash)
1721 );
1722
1723 let mut alice = alice_store.new_replica(namespace.clone())?;
1724 let mut bob = bob_store.new_replica(namespace.clone())?;
1725
1726 let alice_value_2 = b"alice2";
1727 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?;
1729 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?;
1730 sync(&mut alice, &mut bob).await?;
1731 assert_eq!(
1732 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1733 Some(alice_hash_2)
1734 );
1735 assert_eq!(
1736 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1737 Some(alice_hash_2)
1738 );
1739 alice_store.flush()?;
1740 bob_store.flush()?;
1741 Ok(())
1742 }
1743
1744 #[tokio::test]
1745 async fn test_future_timestamp() -> Result<()> {
1746 let mut rng = rand::rng();
1747 let mut store = store::Store::memory();
1748 let author = Author::new(&mut rng);
1749 let namespace = NamespaceSecret::new(&mut rng);
1750
1751 let mut replica = store.new_replica(namespace.clone())?;
1752 let key = b"hi";
1753 let t = system_time_now();
1754 let record = Record::from_data(b"1", t);
1755 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1756 replica
1757 .insert_entry(entry0.clone(), InsertOrigin::Local)
1758 .await?;
1759
1760 assert_eq!(
1761 get_entry(&mut store, namespace.id(), author.id(), key)?,
1762 entry0
1763 );
1764
1765 let mut replica = store.new_replica(namespace.clone())?;
1766 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1767 let record = Record::from_data(b"2", t);
1768 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1769 replica
1770 .insert_entry(entry1.clone(), InsertOrigin::Local)
1771 .await?;
1772 assert_eq!(
1773 get_entry(&mut store, namespace.id(), author.id(), key)?,
1774 entry1
1775 );
1776
1777 let mut replica = store.new_replica(namespace.clone())?;
1778 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1779 let record = Record::from_data(b"2", t);
1780 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1781 replica
1782 .insert_entry(entry2.clone(), InsertOrigin::Local)
1783 .await?;
1784 assert_eq!(
1785 get_entry(&mut store, namespace.id(), author.id(), key)?,
1786 entry2
1787 );
1788
1789 let mut replica = store.new_replica(namespace.clone())?;
1790 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1791 let record = Record::from_data(b"2", t);
1792 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1793 let res = replica.insert_entry(entry3, InsertOrigin::Local).await;
1794 assert!(matches!(
1795 res,
1796 Err(InsertError::Validation(
1797 ValidationFailure::TooFarInTheFuture
1798 ))
1799 ));
1800 assert_eq!(
1801 get_entry(&mut store, namespace.id(), author.id(), key)?,
1802 entry2
1803 );
1804 store.flush()?;
1805 Ok(())
1806 }
1807
1808 #[tokio::test]
1812 async fn test_future_timestamp_accepts_one_second_skew() -> Result<()> {
1813 let mut rng = rand::rng();
1814 let mut store = store::Store::memory();
1815 let author = Author::new(&mut rng);
1816 let namespace = NamespaceSecret::new(&mut rng);
1817 let mut replica = store.new_replica(namespace.clone())?;
1818 let key = b"skew";
1819 let skew_micros: u64 = Duration::from_secs(1).as_micros() as u64;
1820 let now = system_time_now();
1821 let record = Record::from_data(b"ahead", now + skew_micros);
1822 let entry = SignedEntry::from_parts(&namespace, &author, key, record);
1823 replica
1824 .insert_entry(entry.clone(), InsertOrigin::Local)
1825 .await?;
1826 assert_eq!(
1827 get_entry(&mut store, namespace.id(), author.id(), key)?,
1828 entry
1829 );
1830 store.flush()?;
1831 Ok(())
1832 }
1833
1834 #[tokio::test]
1835 async fn test_insert_empty() -> Result<()> {
1836 let mut store = store::Store::memory();
1837 let mut rng = rand::rng();
1838 let alice = Author::new(&mut rng);
1839 let myspace = NamespaceSecret::new(&mut rng);
1840 let mut replica = store.new_replica(myspace.clone())?;
1841 let hash = Hash::new(b"");
1842 let res = replica.insert(b"foo", &alice, hash, 0).await;
1843 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1844 store.flush()?;
1845 Ok(())
1846 }
1847
1848 #[tokio::test]
1849 async fn test_prefix_delete_memory() -> Result<()> {
1850 let store = store::Store::memory();
1851 test_prefix_delete(store).await?;
1852 Ok(())
1853 }
1854
1855 #[tokio::test]
1856 #[cfg(feature = "fs-store")]
1857 async fn test_prefix_delete_fs() -> Result<()> {
1858 let dbfile = tempfile::NamedTempFile::new()?;
1859 let store = store::fs::Store::persistent(dbfile.path())?;
1860 test_prefix_delete(store).await?;
1861 Ok(())
1862 }
1863
1864 async fn test_prefix_delete(mut store: Store) -> Result<()> {
1865 let mut rng = rand::rng();
1866 let alice = Author::new(&mut rng);
1867 let myspace = NamespaceSecret::new(&mut rng);
1868 let mut replica = store.new_replica(myspace.clone())?;
1869 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?;
1870 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?;
1871
1872 assert_eq!(
1874 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1875 Some(hash1)
1876 );
1877 assert_eq!(
1878 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1879 Some(hash2)
1880 );
1881
1882 let mut replica = store.new_replica(myspace.clone())?;
1884 let deleted = replica.delete_prefix(b"foo", &alice).await?;
1885 assert_eq!(deleted, 2);
1886 assert_eq!(
1887 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1888 None
1889 );
1890 assert_eq!(
1891 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1892 None
1893 );
1894 assert_eq!(
1895 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1896 None
1897 );
1898 store.flush()?;
1899 Ok(())
1900 }
1901
1902 #[tokio::test]
1903 async fn test_replica_sync_delete_memory() -> Result<()> {
1904 let alice_store = store::Store::memory();
1905 let bob_store = store::Store::memory();
1906
1907 test_replica_sync_delete(alice_store, bob_store).await
1908 }
1909
1910 #[tokio::test]
1911 #[cfg(feature = "fs-store")]
1912 async fn test_replica_sync_delete_fs() -> Result<()> {
1913 let alice_dbfile = tempfile::NamedTempFile::new()?;
1914 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1915 let bob_dbfile = tempfile::NamedTempFile::new()?;
1916 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1917 test_replica_sync_delete(alice_store, bob_store).await
1918 }
1919
1920 async fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1921 let alice_set = ["foot"];
1922 let bob_set = ["fool", "foo", "fog"];
1923
1924 let mut rng = rand::rng();
1925 let author = Author::new(&mut rng);
1926 let myspace = NamespaceSecret::new(&mut rng);
1927 let mut alice = alice_store.new_replica(myspace.clone())?;
1928 for el in &alice_set {
1929 alice.hash_and_insert(el, &author, el.as_bytes()).await?;
1930 }
1931
1932 let mut bob = bob_store.new_replica(myspace.clone())?;
1933 for el in &bob_set {
1934 bob.hash_and_insert(el, &author, el.as_bytes()).await?;
1935 }
1936
1937 sync(&mut alice, &mut bob).await?;
1938
1939 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1940 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1941 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1942 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1943
1944 let mut alice = alice_store.new_replica(myspace.clone())?;
1945 let mut bob = bob_store.new_replica(myspace.clone())?;
1946 alice.delete_prefix("foo", &author).await?;
1947 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())
1948 .await?;
1949 sync(&mut alice, &mut bob).await?;
1950 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1951 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1952 alice_store.flush()?;
1953 bob_store.flush()?;
1954 Ok(())
1955 }
1956
1957 #[tokio::test]
1958 async fn test_replica_remove_memory() -> Result<()> {
1959 let alice_store = store::Store::memory();
1960 test_replica_remove(alice_store).await
1961 }
1962
1963 #[tokio::test]
1964 #[cfg(feature = "fs-store")]
1965 async fn test_replica_remove_fs() -> Result<()> {
1966 let alice_dbfile = tempfile::NamedTempFile::new()?;
1967 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1968 test_replica_remove(alice_store).await
1969 }
1970
1971 async fn test_replica_remove(mut store: Store) -> Result<()> {
1972 let mut rng = rand::rng();
1973 let namespace = NamespaceSecret::new(&mut rng);
1974 let author = Author::new(&mut rng);
1975 let mut replica = store.new_replica(namespace.clone())?;
1976
1977 let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?;
1979 let res = store
1980 .get_many(namespace.id(), Query::all())?
1981 .collect::<Vec<_>>();
1982 assert_eq!(res.len(), 1);
1983
1984 let res = store.remove_replica(&namespace.id());
1986 assert!(res.is_err());
1988 store.close_replica(namespace.id());
1989 store.remove_replica(&namespace.id())?;
1990 let res = store
1991 .get_many(namespace.id(), Query::all())?
1992 .collect::<Vec<_>>();
1993 assert_eq!(res.len(), 0);
1994
1995 let res = store.load_replica_info(&namespace.id());
1997 assert!(matches!(res, Err(OpenError::NotFound)));
1998
1999 let mut replica = store.new_replica(namespace.clone())?;
2001 replica.insert(b"foo", &author, hash, 3).await?;
2002 let res = store
2003 .get_many(namespace.id(), Query::all())?
2004 .collect::<Vec<_>>();
2005 assert_eq!(res.len(), 1);
2006 store.flush()?;
2007 Ok(())
2008 }
2009
2010 #[tokio::test]
2011 async fn test_replica_delete_edge_cases_memory() -> Result<()> {
2012 let store = store::Store::memory();
2013 test_replica_delete_edge_cases(store).await
2014 }
2015
2016 #[tokio::test]
2017 #[cfg(feature = "fs-store")]
2018 async fn test_replica_delete_edge_cases_fs() -> Result<()> {
2019 let dbfile = tempfile::NamedTempFile::new()?;
2020 let store = store::fs::Store::persistent(dbfile.path())?;
2021 test_replica_delete_edge_cases(store).await
2022 }
2023
2024 async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
2025 let mut rng = rand::rng();
2026 let author = Author::new(&mut rng);
2027 let namespace = NamespaceSecret::new(&mut rng);
2028
2029 let edgecases = [0u8, 1u8, 255u8];
2030 let prefixes = [0u8, 255u8];
2031 let hash = Hash::new(b"foo");
2032 let len = 3;
2033 for prefix in prefixes {
2034 let mut expected = vec![];
2035 let mut replica = store.new_replica(namespace.clone())?;
2036 for suffix in edgecases {
2037 let key = [prefix, suffix].to_vec();
2038 expected.push(key.clone());
2039 replica.insert(&key, &author, hash, len).await?;
2040 }
2041 assert_keys(&mut store, namespace.id(), expected);
2042 let mut replica = store.new_replica(namespace.clone())?;
2043 replica.delete_prefix([prefix], &author).await?;
2044 assert_keys(&mut store, namespace.id(), vec![]);
2045 }
2046
2047 let mut replica = store.new_replica(namespace.clone())?;
2048 let key = vec![1u8, 0u8];
2049 replica.insert(key, &author, hash, len).await?;
2050 let key = vec![1u8, 1u8];
2051 replica.insert(key, &author, hash, len).await?;
2052 let key = vec![1u8, 2u8];
2053 replica.insert(key, &author, hash, len).await?;
2054 let prefix = vec![1u8, 1u8];
2055 replica.delete_prefix(prefix, &author).await?;
2056 assert_keys(
2057 &mut store,
2058 namespace.id(),
2059 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2060 );
2061
2062 let mut replica = store.new_replica(namespace.clone())?;
2063 let key = vec![0u8, 255u8];
2064 replica.insert(key, &author, hash, len).await?;
2065 let key = vec![0u8, 0u8];
2066 replica.insert(key, &author, hash, len).await?;
2067 let prefix = vec![0u8];
2068 replica.delete_prefix(prefix, &author).await?;
2069 assert_keys(
2070 &mut store,
2071 namespace.id(),
2072 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2073 );
2074 store.flush()?;
2075 Ok(())
2076 }
2077
2078 #[tokio::test]
2079 async fn test_latest_iter_memory() -> Result<()> {
2080 let store = store::Store::memory();
2081 test_latest_iter(store).await
2082 }
2083
2084 #[tokio::test]
2085 #[cfg(feature = "fs-store")]
2086 async fn test_latest_iter_fs() -> Result<()> {
2087 let dbfile = tempfile::NamedTempFile::new()?;
2088 let store = store::fs::Store::persistent(dbfile.path())?;
2089 test_latest_iter(store).await
2090 }
2091
2092 async fn test_latest_iter(mut store: Store) -> Result<()> {
2093 let mut rng = rand::rng();
2094 let author0 = Author::new(&mut rng);
2095 let author1 = Author::new(&mut rng);
2096 let namespace = NamespaceSecret::new(&mut rng);
2097 let mut replica = store.new_replica(namespace.clone())?;
2098
2099 replica.hash_and_insert(b"a0.1", &author0, b"hi").await?;
2100 let latest = store
2101 .get_latest_for_each_author(namespace.id())?
2102 .collect::<Result<Vec<_>>>()?;
2103 assert_eq!(latest.len(), 1);
2104 assert_eq!(latest[0].2, b"a0.1".to_vec());
2105
2106 let mut replica = store.new_replica(namespace.clone())?;
2107 replica.hash_and_insert(b"a1.1", &author1, b"hi").await?;
2108 replica.hash_and_insert(b"a0.2", &author0, b"hi").await?;
2109 let latest = store
2110 .get_latest_for_each_author(namespace.id())?
2111 .collect::<Result<Vec<_>>>()?;
2112 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2113 latest_keys.sort();
2114 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2115 store.flush()?;
2116 Ok(())
2117 }
2118
2119 #[tokio::test]
2120 async fn test_replica_byte_keys_memory() -> Result<()> {
2121 let store = store::Store::memory();
2122
2123 test_replica_byte_keys(store).await?;
2124 Ok(())
2125 }
2126
2127 #[tokio::test]
2128 #[cfg(feature = "fs-store")]
2129 async fn test_replica_byte_keys_fs() -> Result<()> {
2130 let dbfile = tempfile::NamedTempFile::new()?;
2131 let store = store::fs::Store::persistent(dbfile.path())?;
2132 test_replica_byte_keys(store).await?;
2133
2134 Ok(())
2135 }
2136
2137 async fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2138 let mut rng = rand::rng();
2139 let author = Author::new(&mut rng);
2140 let namespace = NamespaceSecret::new(&mut rng);
2141
2142 let hash = Hash::new(b"foo");
2143 let len = 3;
2144
2145 let key = vec![1u8, 0u8];
2146 let mut replica = store.new_replica(namespace.clone())?;
2147 replica.insert(key, &author, hash, len).await?;
2148 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2149 let key = vec![1u8, 2u8];
2150 let mut replica = store.new_replica(namespace.clone())?;
2151 replica.insert(key, &author, hash, len).await?;
2152 assert_keys(
2153 &mut store,
2154 namespace.id(),
2155 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2156 );
2157
2158 let key = vec![0u8, 255u8];
2159 let mut replica = store.new_replica(namespace.clone())?;
2160 replica.insert(key, &author, hash, len).await?;
2161 assert_keys(
2162 &mut store,
2163 namespace.id(),
2164 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2165 );
2166 store.flush()?;
2167 Ok(())
2168 }
2169
2170 #[tokio::test]
2171 async fn test_replica_capability_memory() -> Result<()> {
2172 let store = store::Store::memory();
2173 test_replica_capability(store).await
2174 }
2175
2176 #[tokio::test]
2177 #[cfg(feature = "fs-store")]
2178 async fn test_replica_capability_fs() -> Result<()> {
2179 let dbfile = tempfile::NamedTempFile::new()?;
2180 let store = store::fs::Store::persistent(dbfile.path())?;
2181 test_replica_capability(store).await
2182 }
2183
2184 #[allow(clippy::redundant_pattern_matching)]
2185 async fn test_replica_capability(mut store: Store) -> Result<()> {
2186 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2187 let author = store.new_author(&mut rng)?;
2188 let namespace = NamespaceSecret::new(&mut rng);
2189
2190 let capability = Capability::Read(namespace.id());
2192 store.import_namespace(capability)?;
2193 let mut replica = store.open_replica(&namespace.id())?;
2194 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2195 assert!(matches!(res, Err(InsertError::ReadOnly)));
2196
2197 let capability = Capability::Write(namespace.clone());
2199 store.import_namespace(capability)?;
2200 let mut replica = store.open_replica(&namespace.id())?;
2201 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2202 assert!(matches!(res, Ok(_)));
2203 store.close_replica(namespace.id());
2204 let mut replica = store.open_replica(&namespace.id())?;
2205 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2206 assert!(res.is_ok());
2207
2208 let capability = Capability::Read(namespace.id());
2210 store.import_namespace(capability)?;
2211 store.close_replica(namespace.id());
2212 let mut replica = store.open_replica(&namespace.id())?;
2213 let res = replica.hash_and_insert(b"foo", &author, b"bar").await;
2214 assert!(res.is_ok());
2215 store.flush()?;
2216 Ok(())
2217 }
2218
2219 #[tokio::test]
2220 async fn test_actor_capability_memory() -> Result<()> {
2221 let store = store::Store::memory();
2222 test_actor_capability(store).await
2223 }
2224
2225 #[tokio::test]
2226 #[cfg(feature = "fs-store")]
2227 async fn test_actor_capability_fs() -> Result<()> {
2228 let dbfile = tempfile::NamedTempFile::new()?;
2229 let store = store::fs::Store::persistent(dbfile.path())?;
2230 test_actor_capability(store).await
2231 }
2232
2233 async fn test_actor_capability(store: Store) -> Result<()> {
2234 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2236 let author = Author::new(&mut rng);
2237 let handle = SyncHandle::spawn(store, None, "test".into());
2238 let author = handle.import_author(author).await?;
2239 let namespace = NamespaceSecret::new(&mut rng);
2240 let id = namespace.id();
2241
2242 let capability = Capability::Read(namespace.id());
2244 handle.import_namespace(capability).await?;
2245 handle.open(namespace.id(), Default::default()).await?;
2246 let res = handle
2247 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2248 .await;
2249 assert!(res.is_err());
2250
2251 let capability = Capability::Write(namespace.clone());
2253 handle.import_namespace(capability).await?;
2254 let res = handle
2255 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2256 .await;
2257 assert!(res.is_ok());
2258
2259 handle.close(namespace.id()).await?;
2261 let res = handle
2262 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2263 .await;
2264 assert!(res.is_err());
2265 handle.open(namespace.id(), Default::default()).await?;
2266 let res = handle
2267 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2268 .await;
2269 assert!(res.is_ok());
2270 Ok(())
2271 }
2272
2273 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2274 let mut res = vec![];
2275 while let Ok(ev) = events.try_recv() {
2276 res.push(ev);
2277 }
2278 res
2279 }
2280
2281 #[tokio::test]
2284 async fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2285 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2286 let mut store1 = store::Store::memory();
2287 let mut store2 = store::Store::memory();
2288 let peer1 = [1u8; 32];
2289 let peer2 = [2u8; 32];
2290 let mut state1 = SyncOutcome::default();
2291 let mut state2 = SyncOutcome::default();
2292
2293 let author = Author::new(&mut rng);
2294 let namespace = NamespaceSecret::new(&mut rng);
2295 let mut replica1 = store1.new_replica(namespace.clone())?;
2296 let mut replica2 = store2.new_replica(namespace.clone())?;
2297
2298 let (events1_sender, events1) = async_channel::bounded(32);
2299 let (events2_sender, events2) = async_channel::bounded(32);
2300
2301 replica1.info.subscribe(events1_sender);
2302 replica2.info.subscribe(events2_sender);
2303
2304 replica1.hash_and_insert(b"foo", &author, b"init").await?;
2305
2306 let from1 = replica1.sync_initial_message()?;
2307 let from2 = replica2
2308 .sync_process_message(from1, peer1, &mut state2)
2309 .await
2310 .unwrap()
2311 .unwrap();
2312 let from1 = replica1
2313 .sync_process_message(from2, peer2, &mut state1)
2314 .await
2315 .unwrap()
2316 .unwrap();
2317 replica2.hash_and_insert(b"foo", &author, b"update").await?;
2321 let from2 = replica2
2322 .sync_process_message(from1, peer1, &mut state2)
2323 .await
2324 .unwrap();
2325 assert!(from2.is_none());
2326 let events1 = drain(events1);
2327 let events2 = drain(events2);
2328 assert_eq!(events1.len(), 1);
2329 assert_eq!(events2.len(), 1);
2330 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2331 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2332 assert_eq!(state1.num_sent, 1);
2333 assert_eq!(state1.num_recv, 0);
2334 assert_eq!(state2.num_sent, 0);
2335 assert_eq!(state2.num_recv, 1);
2336 store1.flush()?;
2337 store2.flush()?;
2338 Ok(())
2339 }
2340
2341 #[tokio::test]
2342 async fn test_replica_queries_mem() -> Result<()> {
2343 let store = store::Store::memory();
2344
2345 test_replica_queries(store).await?;
2346 Ok(())
2347 }
2348
2349 #[tokio::test]
2350 #[cfg(feature = "fs-store")]
2351 async fn test_replica_queries_fs() -> Result<()> {
2352 let dbfile = tempfile::NamedTempFile::new()?;
2353 let store = store::fs::Store::persistent(dbfile.path())?;
2354 test_replica_queries(store).await?;
2355
2356 Ok(())
2357 }
2358
2359 async fn test_replica_queries(mut store: Store) -> Result<()> {
2360 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2361 let namespace = NamespaceSecret::new(&mut rng);
2362 let namespace_id = namespace.id();
2363
2364 let a1 = store.new_author(&mut rng)?;
2365 let a2 = store.new_author(&mut rng)?;
2366 let a3 = store.new_author(&mut rng)?;
2367 println!(
2368 "a1 {} a2 {} a3 {}",
2369 a1.id().fmt_short(),
2370 a2.id().fmt_short(),
2371 a3.id().fmt_short()
2372 );
2373
2374 let mut replica = store.new_replica(namespace.clone())?;
2375 replica.hash_and_insert("hi/world", &a2, "a2").await?;
2376 replica.hash_and_insert("hi/world", &a1, "a1").await?;
2377 replica.hash_and_insert("hi/moon", &a2, "a1").await?;
2378 replica.hash_and_insert("hi", &a3, "a3").await?;
2379
2380 struct QueryTester<'a> {
2381 store: &'a mut Store,
2382 namespace: NamespaceId,
2383 }
2384 impl QueryTester<'_> {
2385 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2386 let query = query.into();
2387 let actual = self
2388 .store
2389 .get_many(self.namespace, query.clone())
2390 .unwrap()
2391 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2392 .collect::<Result<Vec<_>>>()
2393 .unwrap();
2394 let expected = expected
2395 .into_iter()
2396 .map(|(key, author)| (key.to_string(), author.id()))
2397 .collect::<Vec<_>>();
2398 assert_eq!(actual, expected, "query: {query:#?}")
2399 }
2400 }
2401
2402 let mut qt = QueryTester {
2403 store: &mut store,
2404 namespace: namespace_id,
2405 };
2406
2407 qt.assert(
2408 Query::all(),
2409 vec![
2410 ("hi/world", &a1),
2411 ("hi/moon", &a2),
2412 ("hi/world", &a2),
2413 ("hi", &a3),
2414 ],
2415 );
2416
2417 qt.assert(
2418 Query::single_latest_per_key(),
2419 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2420 );
2421
2422 qt.assert(
2423 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2424 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2425 );
2426
2427 qt.assert(
2428 Query::single_latest_per_key().key_prefix("hi/"),
2429 vec![("hi/moon", &a2), ("hi/world", &a1)],
2430 );
2431
2432 qt.assert(
2433 Query::single_latest_per_key()
2434 .key_prefix("hi/")
2435 .sort_direction(SortDirection::Desc),
2436 vec![("hi/world", &a1), ("hi/moon", &a2)],
2437 );
2438
2439 qt.assert(
2440 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2441 vec![
2442 ("hi", &a3),
2443 ("hi/moon", &a2),
2444 ("hi/world", &a1),
2445 ("hi/world", &a2),
2446 ],
2447 );
2448
2449 qt.assert(
2450 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2451 vec![
2452 ("hi/world", &a2),
2453 ("hi/world", &a1),
2454 ("hi/moon", &a2),
2455 ("hi", &a3),
2456 ],
2457 );
2458
2459 qt.assert(
2460 Query::all().key_prefix("hi/"),
2461 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2462 );
2463
2464 qt.assert(
2465 Query::all().key_prefix("hi/").offset(1).limit(1),
2466 vec![("hi/moon", &a2)],
2467 );
2468
2469 qt.assert(
2470 Query::all()
2471 .key_prefix("hi/")
2472 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2473 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2474 );
2475
2476 qt.assert(
2477 Query::all()
2478 .key_prefix("hi/")
2479 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2480 .offset(1)
2481 .limit(1),
2482 vec![("hi/world", &a1)],
2483 );
2484
2485 qt.assert(
2486 Query::all()
2487 .key_prefix("hi/")
2488 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2489 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2490 );
2491
2492 qt.assert(
2493 Query::all()
2494 .key_prefix("hi/")
2495 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2496 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2497 );
2498
2499 qt.assert(
2500 Query::all()
2501 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2502 .limit(2)
2503 .offset(1),
2504 vec![("hi/moon", &a2), ("hi/world", &a1)],
2505 );
2506
2507 let mut replica = store.new_replica(namespace)?;
2508 replica.delete_prefix("hi/world", &a2).await?;
2509 let mut qt = QueryTester {
2510 store: &mut store,
2511 namespace: namespace_id,
2512 };
2513
2514 qt.assert(
2515 Query::all(),
2516 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2517 );
2518
2519 qt.assert(
2520 Query::all().include_empty(),
2521 vec![
2522 ("hi/world", &a1),
2523 ("hi/moon", &a2),
2524 ("hi/world", &a2),
2525 ("hi", &a3),
2526 ],
2527 );
2528 store.flush()?;
2529 Ok(())
2530 }
2531
2532 #[test]
2533 fn test_dl_policies_mem() -> Result<()> {
2534 let mut store = store::Store::memory();
2535 test_dl_policies(&mut store)
2536 }
2537
2538 #[test]
2539 #[cfg(feature = "fs-store")]
2540 fn test_dl_policies_fs() -> Result<()> {
2541 let dbfile = tempfile::NamedTempFile::new()?;
2542 let mut store = store::fs::Store::persistent(dbfile.path())?;
2543 test_dl_policies(&mut store)
2544 }
2545
2546 fn test_dl_policies(store: &mut Store) -> Result<()> {
2547 let mut rng = rand::rngs::ChaCha12Rng::seed_from_u64(1);
2548 let namespace = NamespaceSecret::new(&mut rng);
2549 let id = namespace.id();
2550
2551 let filter = store::FilterKind::Exact("foo".into());
2552 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2553 store
2554 .set_download_policy(&id, policy.clone())
2555 .expect_err("document dos not exist");
2556
2557 store.new_replica(namespace)?;
2559
2560 store.set_download_policy(&id, policy.clone())?;
2561 let retrieved_policy = store.get_download_policy(&id)?;
2562 assert_eq!(retrieved_policy, policy);
2563 store.flush()?;
2564 Ok(())
2565 }
2566
2567 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2568 expected.sort();
2569 assert_eq!(expected, get_keys_sorted(store, namespace));
2570 }
2571
2572 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2573 let mut res = store
2574 .get_many(namespace, Query::all())
2575 .unwrap()
2576 .map(|e| e.map(|e| e.key().to_vec()))
2577 .collect::<Result<Vec<_>>>()
2578 .unwrap();
2579 res.sort();
2580 res
2581 }
2582
2583 fn get_entry(
2584 store: &mut Store,
2585 namespace: NamespaceId,
2586 author: AuthorId,
2587 key: &[u8],
2588 ) -> anyhow::Result<SignedEntry> {
2589 let entry = store
2590 .get_exact(namespace, author, key, true)?
2591 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2592 Ok(entry)
2593 }
2594
2595 fn get_content_hash(
2596 store: &mut Store,
2597 namespace: NamespaceId,
2598 author: AuthorId,
2599 key: &[u8],
2600 ) -> anyhow::Result<Option<Hash>> {
2601 let hash = store
2602 .get_exact(namespace, author, key, false)?
2603 .map(|e| e.content_hash());
2604 Ok(hash)
2605 }
2606
2607 async fn sync<'a>(
2608 alice: &'a mut Replica<'a>,
2609 bob: &'a mut Replica<'a>,
2610 ) -> Result<(SyncOutcome, SyncOutcome)> {
2611 let alice_peer_id = [1u8; 32];
2612 let bob_peer_id = [2u8; 32];
2613 let mut alice_state = SyncOutcome::default();
2614 let mut bob_state = SyncOutcome::default();
2615 let mut next_to_bob = Some(alice.sync_initial_message()?);
2617 let mut rounds = 0;
2618 while let Some(msg) = next_to_bob.take() {
2619 assert!(rounds < 100, "too many rounds");
2620 rounds += 1;
2621 println!("round {rounds}");
2622 if let Some(msg) = bob
2623 .sync_process_message(msg, alice_peer_id, &mut bob_state)
2624 .await?
2625 {
2626 next_to_bob = alice
2627 .sync_process_message(msg, bob_peer_id, &mut alice_state)
2628 .await?
2629 }
2630 }
2631 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2632 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2633 Ok((alice_state, bob_state))
2634 }
2635
2636 fn check_entries(
2637 store: &mut Store,
2638 namespace: &NamespaceId,
2639 author: &Author,
2640 set: &[&str],
2641 ) -> Result<()> {
2642 for el in set {
2643 store.get_exact(*namespace, author.id(), el, false)?;
2644 }
2645 Ok(())
2646 }
2647
2648 #[test]
2650 fn test_signed_entry_postcard_snapshot() {
2651 let author = Author::from_bytes(&[0xa1; 32]);
2652 let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2653 let record = Record::new(Hash::EMPTY, 0, 1_700_000_000_000_000u64);
2654 let id = RecordIdentifier::new(namespace.id(), author.id(), b"wire-format-test");
2655 let signed = SignedEntry::from_entry(Entry::new(id, record), &namespace, &author);
2656
2657 let bytes = postcard::to_stdvec(&signed).unwrap();
2658 assert_eq!(
2659 hex::encode(&bytes),
2660 "4b523f1b6d9b00a4779fc9f8f105a9e36f062ceb7d511b632905782042ad30acb6dd07bfced4ecd5f3aa58321e8ace63f48f988ed8461bfdcd8b0e902187a10e228ddc6998329b7faa64875fe80da36406ea8d87e3e57bb048323e9cb66c0b343b60c4e709fb978b878e37d0c362edfc06c8cdc774c8b29d94e48eaa06cca60f5055154f42065ea5a1bea05463826be2684eb92df92c100027aabaae57ca554207bc7cbcb5636375fa1d82434d466724d92377f53b980695dd49d26d0ce12205a5776972652d666f726d61742d7465737400af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f32628080f9c0c1c48203",
2661 );
2662
2663 let decoded: SignedEntry = postcard::from_bytes(&bytes).unwrap();
2664 assert_eq!(decoded, signed);
2665 }
2666
2667 #[test]
2669 fn test_author_postcard_snapshot() {
2670 let author = Author::from_bytes(&[0xa1; 32]);
2671 let bytes = postcard::to_stdvec(&author).unwrap();
2672 assert_eq!(
2673 hex::encode(&bytes),
2674 "20a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1",
2675 );
2676 }
2677
2678 #[test]
2680 fn test_namespace_secret_postcard_snapshot() {
2681 let namespace = NamespaceSecret::from_bytes(&[0xb2; 32]);
2682 let bytes = postcard::to_stdvec(&namespace).unwrap();
2683 assert_eq!(
2684 hex::encode(&bytes),
2685 "20b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2",
2686 );
2687 }
2688}