1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 sync::Arc,
13 time::{Duration, SystemTime},
14};
15
16use bytes::{Bytes, BytesMut};
17#[cfg(feature = "metrics")]
18use iroh_metrics::{inc, inc_by};
19use std::ops::{Deref, DerefMut};
20
21use ed25519_dalek::{Signature, SignatureError};
22use iroh_base::{base32, hash::Hash};
23use serde::{Deserialize, Serialize};
24
25pub use crate::heads::AuthorHeads;
26#[cfg(feature = "metrics")]
27use crate::metrics::Metrics;
28use crate::{
29 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
30 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
31 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
32};
33
34pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
38
39pub type PeerIdBytes = [u8; 32];
42
43pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
46
47pub type ContentStatusCallback = Arc<dyn Fn(Hash) -> ContentStatus + Send + Sync + 'static>;
49
50#[derive(Debug, Clone)]
52pub enum Event {
53 LocalInsert {
55 namespace: NamespaceId,
57 entry: SignedEntry,
59 },
60 RemoteInsert {
62 namespace: NamespaceId,
64 entry: SignedEntry,
66 from: PeerIdBytes,
68 should_download: bool,
70 remote_content_status: ContentStatus,
72 },
73}
74
75#[derive(Debug, Clone)]
77pub enum InsertOrigin {
78 Local,
80 Sync {
82 from: PeerIdBytes,
84 remote_content_status: ContentStatus,
86 },
87}
88
89#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
91pub enum ContentStatus {
92 Complete,
94 Incomplete,
96 Missing,
98}
99
100#[derive(Debug, Clone, Default)]
102pub struct SyncOutcome {
103 pub heads_received: AuthorHeads,
105 pub num_recv: usize,
107 pub num_sent: usize,
109}
110
111#[derive(Debug, Default)]
112struct Subscribers(Vec<flume::Sender<Event>>);
113impl Subscribers {
114 pub fn subscribe(&mut self, sender: flume::Sender<Event>) {
115 self.0.push(sender)
116 }
117 pub fn unsubscribe(&mut self, sender: &flume::Sender<Event>) {
118 self.0.retain(|s| !s.same_channel(sender));
119 }
120 pub fn send(&mut self, event: Event) {
121 self.0.retain(|sender| sender.send(event.clone()).is_ok())
122 }
123 pub fn len(&self) -> usize {
124 self.0.len()
125 }
126 pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
127 if !self.0.is_empty() {
128 self.send(f())
129 }
130 }
131}
132
133#[derive(
135 Debug,
136 Clone,
137 Copy,
138 Serialize,
139 Deserialize,
140 num_enum::IntoPrimitive,
141 num_enum::TryFromPrimitive,
142 strum::Display,
143)]
144#[repr(u8)]
145#[strum(serialize_all = "snake_case")]
146pub enum CapabilityKind {
147 Write = 1,
149 Read = 2,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
155pub enum Capability {
156 Write(NamespaceSecret),
158 Read(NamespaceId),
160}
161
162impl Capability {
163 pub fn id(&self) -> NamespaceId {
165 match self {
166 Capability::Write(secret) => secret.id(),
167 Capability::Read(id) => *id,
168 }
169 }
170
171 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
174 match self {
175 Capability::Write(secret) => Ok(secret),
176 Capability::Read(_) => Err(ReadOnly),
177 }
178 }
179
180 pub fn kind(&self) -> CapabilityKind {
182 match self {
183 Capability::Write(_) => CapabilityKind::Write,
184 Capability::Read(_) => CapabilityKind::Read,
185 }
186 }
187
188 pub fn raw(&self) -> (u8, [u8; 32]) {
190 let capability_repr: u8 = self.kind().into();
191 let bytes = match self {
192 Capability::Write(secret) => secret.to_bytes(),
193 Capability::Read(id) => id.to_bytes(),
194 };
195 (capability_repr, bytes)
196 }
197
198 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
200 let kind: CapabilityKind = kind.try_into()?;
201 let capability = match kind {
202 CapabilityKind::Write => {
203 let secret = NamespaceSecret::from_bytes(bytes);
204 Capability::Write(secret)
205 }
206 CapabilityKind::Read => {
207 let id = NamespaceId::from(bytes);
208 Capability::Read(id)
209 }
210 };
211 Ok(capability)
212 }
213
214 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
220 if other.id() != self.id() {
221 return Err(CapabilityError::NamespaceMismatch);
222 }
223
224 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
226 let _ = std::mem::replace(self, other);
227 Ok(true)
228 } else {
229 Ok(false)
230 }
231 }
232}
233
234#[derive(Debug, thiserror::Error)]
236pub enum CapabilityError {
237 #[error("Namespaces are not the same")]
239 NamespaceMismatch,
240}
241
242#[derive(derive_more::Debug)]
244pub struct ReplicaInfo {
245 pub(crate) capability: Capability,
246 subscribers: Subscribers,
247 #[debug("ContentStatusCallback")]
248 content_status_cb: Option<ContentStatusCallback>,
249 closed: bool,
250}
251
252impl ReplicaInfo {
253 pub fn new(capability: Capability) -> Self {
255 Self {
256 capability,
257 subscribers: Default::default(),
258 content_status_cb: None,
260 closed: false,
261 }
262 }
263
264 pub fn subscribe(&mut self, sender: flume::Sender<Event>) {
270 self.subscribers.subscribe(sender)
271 }
272
273 pub fn unsubscribe(&mut self, sender: &flume::Sender<Event>) {
279 self.subscribers.unsubscribe(sender)
280 }
281
282 pub fn subscribers_count(&self) -> usize {
284 self.subscribers.len()
285 }
286
287 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
292 if self.content_status_cb.is_some() {
293 false
294 } else {
295 self.content_status_cb = Some(cb);
296 true
297 }
298 }
299
300 fn ensure_open(&self) -> Result<(), InsertError> {
301 if self.closed() {
302 Err(InsertError::Closed)
303 } else {
304 Ok(())
305 }
306 }
307
308 pub fn closed(&self) -> bool {
314 self.closed
315 }
316
317 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
324 self.capability.merge(capability)
325 }
326}
327
328#[derive(derive_more::Debug)]
330pub struct Replica<'a, I = Box<ReplicaInfo>> {
331 pub(crate) store: StoreInstance<'a>,
332 pub(crate) info: I,
333}
334
335impl<'a, I> Replica<'a, I>
336where
337 I: Deref<Target = ReplicaInfo> + DerefMut,
338{
339 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
341 Replica { info, store }
342 }
343
344 pub fn insert(
352 &mut self,
353 key: impl AsRef<[u8]>,
354 author: &Author,
355 hash: Hash,
356 len: u64,
357 ) -> Result<usize, InsertError> {
358 if len == 0 || hash == Hash::EMPTY {
359 return Err(InsertError::EntryIsEmpty);
360 }
361 self.info.ensure_open()?;
362 let id = RecordIdentifier::new(self.id(), author.id(), key);
363 let record = Record::new_current(hash, len);
364 let entry = Entry::new(id, record);
365 let secret = self.secret_key()?;
366 let signed_entry = entry.sign(secret, author);
367 self.insert_entry(signed_entry, InsertOrigin::Local)
368 }
369
370 pub fn delete_prefix(
377 &mut self,
378 prefix: impl AsRef<[u8]>,
379 author: &Author,
380 ) -> Result<usize, InsertError> {
381 self.info.ensure_open()?;
382 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
383 let entry = Entry::new_empty(id);
384 let signed_entry = entry.sign(self.secret_key()?, author);
385 self.insert_entry(signed_entry, InsertOrigin::Local)
386 }
387
388 pub fn insert_remote_entry(
396 &mut self,
397 entry: SignedEntry,
398 received_from: PeerIdBytes,
399 content_status: ContentStatus,
400 ) -> Result<usize, InsertError> {
401 self.info.ensure_open()?;
402 entry.validate_empty()?;
403 let origin = InsertOrigin::Sync {
404 from: received_from,
405 remote_content_status: content_status,
406 };
407 self.insert_entry(entry, origin)
408 }
409
410 fn insert_entry(
414 &mut self,
415 entry: SignedEntry,
416 origin: InsertOrigin,
417 ) -> Result<usize, InsertError> {
418 let namespace = self.id();
419
420 #[cfg(feature = "metrics")]
421 let len = entry.content_len();
422
423 let store = &self.store;
424 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
425
426 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
427
428 let removed_count = match outcome {
429 InsertOutcome::Inserted { removed } => removed,
430 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
431 };
432
433 let insert_event = match origin {
434 InsertOrigin::Local => {
435 #[cfg(feature = "metrics")]
436 {
437 inc!(Metrics, new_entries_local);
438 inc_by!(Metrics, new_entries_local_size, len);
439 }
440 Event::LocalInsert { namespace, entry }
441 }
442 InsertOrigin::Sync {
443 from,
444 remote_content_status,
445 } => {
446 #[cfg(feature = "metrics")]
447 {
448 inc!(Metrics, new_entries_remote);
449 inc_by!(Metrics, new_entries_remote_size, len);
450 }
451
452 let download_policy = self
453 .store
454 .get_download_policy(&self.capability().id())
455 .unwrap_or_default();
456 let should_download = download_policy.matches(entry.entry());
457 Event::RemoteInsert {
458 namespace,
459 entry,
460 from,
461 should_download,
462 remote_content_status,
463 }
464 }
465 };
466
467 self.info.subscribers.send(insert_event);
468
469 Ok(removed_count)
470 }
471
472 pub fn hash_and_insert(
477 &mut self,
478 key: impl AsRef<[u8]>,
479 author: &Author,
480 data: impl AsRef<[u8]>,
481 ) -> Result<Hash, InsertError> {
482 self.info.ensure_open()?;
483 let len = data.as_ref().len() as u64;
484 let hash = Hash::new(data);
485 self.insert(key, author, hash, len)?;
486 Ok(hash)
487 }
488
489 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
491 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
492 }
493
494 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
496 self.info.ensure_open().map_err(anyhow::Error::from)?;
497 self.store.initial_message().map_err(Into::into)
498 }
499
500 pub fn sync_process_message(
504 &mut self,
505 message: crate::ranger::Message<SignedEntry>,
506 from_peer: PeerIdBytes,
507 state: &mut SyncOutcome,
508 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
509 self.info.ensure_open()?;
510 let my_namespace = self.id();
511 let now = system_time_now();
512
513 state.num_recv += message.value_count();
515 for (entry, _content_status) in message.values() {
516 state
517 .heads_received
518 .insert(entry.author(), entry.timestamp());
519 }
520
521 let cb = self.info.content_status_cb.clone();
524 let download_policy = self
525 .store
526 .get_download_policy(&my_namespace)
527 .unwrap_or_default();
528 let reply = self.store.process_message(
529 &Default::default(),
530 message,
531 |store, entry, content_status| {
533 let origin = InsertOrigin::Sync {
534 from: from_peer,
535 remote_content_status: content_status,
536 };
537 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
538 },
539 |_store, entry, content_status| {
541 self.info.subscribers.send_with(|| {
543 let should_download = download_policy.matches(entry.entry());
544 Event::RemoteInsert {
545 from: from_peer,
546 namespace: my_namespace,
547 entry: entry.clone(),
548 should_download,
549 remote_content_status: content_status,
550 }
551 })
552 },
553 |_store, entry| {
555 if let Some(cb) = cb.as_ref() {
556 cb(entry.content_hash())
557 } else {
558 ContentStatus::Missing
559 }
560 },
561 )?;
562
563 if let Some(ref reply) = reply {
565 state.num_sent += reply.value_count();
566 }
567
568 Ok(reply)
569 }
570
571 pub fn id(&self) -> NamespaceId {
573 self.info.capability.id()
574 }
575
576 pub fn capability(&self) -> &Capability {
578 &self.info.capability
579 }
580
581 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
584 self.info.capability.secret_key()
585 }
586}
587
588#[derive(Debug, thiserror::Error)]
590#[error("Replica allows read access only.")]
591pub struct ReadOnly;
592
593fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
601 now: u64,
602 store: &S,
603 expected_namespace: NamespaceId,
604 entry: &SignedEntry,
605 origin: &InsertOrigin,
606) -> Result<(), ValidationFailure> {
607 if entry.namespace() != expected_namespace {
609 return Err(ValidationFailure::InvalidNamespace);
610 }
611
612 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
614 return Err(ValidationFailure::BadSignature);
615 }
616
617 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
619 return Err(ValidationFailure::TooFarInTheFuture);
620 }
621 Ok(())
622}
623
624#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
626pub enum InsertError {
627 #[error("storage error")]
629 Store(anyhow::Error),
630 #[error("validation failure")]
632 Validation(#[from] ValidationFailure),
633 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
635 NewerEntryExists,
636 #[error("Attempted to insert an empty entry")]
638 EntryIsEmpty,
639 #[error("Attempted to insert to read only replica")]
641 #[from(ReadOnly)]
642 ReadOnly,
643 #[error("replica is closed")]
645 Closed,
646}
647
648#[derive(thiserror::Error, Debug)]
650pub enum ValidationFailure {
651 #[error("Entry namespace does not match the current replica")]
653 InvalidNamespace,
654 #[error("Entry signature is invalid")]
656 BadSignature,
657 #[error("Entry timestamp is too far in the future.")]
659 TooFarInTheFuture,
660 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
662 InvalidEmptyEntry,
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
667pub struct SignedEntry {
668 signature: EntrySignature,
669 entry: Entry,
670}
671
672impl From<SignedEntry> for Entry {
673 fn from(value: SignedEntry) -> Self {
674 value.entry
675 }
676}
677
678impl PartialOrd for SignedEntry {
679 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680 Some(self.cmp(other))
681 }
682}
683
684impl Ord for SignedEntry {
685 fn cmp(&self, other: &Self) -> Ordering {
686 self.entry.cmp(&other.entry)
687 }
688}
689
690impl PartialOrd for Entry {
691 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
692 Some(self.cmp(other))
693 }
694}
695
696impl Ord for Entry {
697 fn cmp(&self, other: &Self) -> Ordering {
698 self.id
699 .cmp(&other.id)
700 .then_with(|| self.record.cmp(&other.record))
701 }
702}
703
704impl SignedEntry {
705 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
706 SignedEntry { signature, entry }
707 }
708
709 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
711 let signature = EntrySignature::from_entry(&entry, namespace, author);
712 SignedEntry { signature, entry }
713 }
714
715 pub fn from_parts(
717 namespace: &NamespaceSecret,
718 author: &Author,
719 key: impl AsRef<[u8]>,
720 record: Record,
721 ) -> Self {
722 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
723 let entry = Entry::new(id, record);
724 Self::from_entry(entry, namespace, author)
725 }
726
727 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
729 self.signature.verify(
730 &self.entry,
731 &self.entry.namespace().public_key(store)?,
732 &self.entry.author().public_key(store)?,
733 )
734 }
735
736 pub fn signature(&self) -> &EntrySignature {
738 &self.signature
739 }
740
741 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
743 self.entry().validate_empty()
744 }
745
746 pub fn entry(&self) -> &Entry {
748 &self.entry
749 }
750
751 pub fn content_hash(&self) -> Hash {
753 self.entry().content_hash()
754 }
755
756 pub fn content_len(&self) -> u64 {
758 self.entry().content_len()
759 }
760
761 pub fn author_bytes(&self) -> AuthorId {
763 self.entry().id().author()
764 }
765
766 pub fn key(&self) -> &[u8] {
768 self.entry().id().key()
769 }
770
771 pub fn timestamp(&self) -> u64 {
773 self.entry().timestamp()
774 }
775}
776
777impl RangeEntry for SignedEntry {
778 type Key = RecordIdentifier;
779 type Value = Record;
780
781 fn key(&self) -> &Self::Key {
782 &self.entry.id
783 }
784
785 fn value(&self) -> &Self::Value {
786 &self.entry.record
787 }
788
789 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
790 let mut hasher = blake3::Hasher::new();
791 hasher.update(self.namespace().as_ref());
792 hasher.update(self.author_bytes().as_ref());
793 hasher.update(self.key());
794 hasher.update(&self.timestamp().to_be_bytes());
795 hasher.update(self.content_hash().as_bytes());
796 Fingerprint(hasher.finalize().into())
797 }
798}
799
800#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
802pub struct EntrySignature {
803 author_signature: Signature,
804 namespace_signature: Signature,
805}
806
807impl Debug for EntrySignature {
808 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809 f.debug_struct("EntrySignature")
810 .field(
811 "namespace_signature",
812 &base32::fmt(self.namespace_signature.to_bytes()),
813 )
814 .field(
815 "author_signature",
816 &base32::fmt(self.author_signature.to_bytes()),
817 )
818 .finish()
819 }
820}
821
822impl EntrySignature {
823 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
825 let bytes = entry.to_vec();
828 let namespace_signature = namespace.sign(&bytes);
829 let author_signature = author.sign(&bytes);
830
831 EntrySignature {
832 author_signature,
833 namespace_signature,
834 }
835 }
836
837 pub fn verify(
840 &self,
841 entry: &Entry,
842 namespace: &NamespacePublicKey,
843 author: &AuthorPublicKey,
844 ) -> Result<(), SignatureError> {
845 let bytes = entry.to_vec();
846 namespace.verify(&bytes, &self.namespace_signature)?;
847 author.verify(&bytes, &self.author_signature)?;
848
849 Ok(())
850 }
851
852 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
853 let namespace_signature = Signature::from_bytes(namespace_sig);
854 let author_signature = Signature::from_bytes(author_sig);
855
856 EntrySignature {
857 author_signature,
858 namespace_signature,
859 }
860 }
861
862 pub(crate) fn author(&self) -> &Signature {
863 &self.author_signature
864 }
865
866 pub(crate) fn namespace(&self) -> &Signature {
867 &self.namespace_signature
868 }
869}
870
871#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
877pub struct Entry {
878 id: RecordIdentifier,
879 record: Record,
880}
881
882impl Entry {
883 pub fn new(id: RecordIdentifier, record: Record) -> Self {
885 Entry { id, record }
886 }
887
888 pub fn new_empty(id: RecordIdentifier) -> Self {
890 Entry {
891 id,
892 record: Record::empty_current(),
893 }
894 }
895
896 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
898 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
899 (true, true) => Ok(()),
900 (false, false) => Ok(()),
901 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
902 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
903 }
904 }
905
906 pub fn id(&self) -> &RecordIdentifier {
908 &self.id
909 }
910
911 pub fn namespace(&self) -> NamespaceId {
913 self.id.namespace()
914 }
915
916 pub fn author(&self) -> AuthorId {
918 self.id.author()
919 }
920
921 pub fn key(&self) -> &[u8] {
923 self.id.key()
924 }
925
926 pub fn record(&self) -> &Record {
928 &self.record
929 }
930
931 pub fn encode(&self, out: &mut Vec<u8>) {
933 self.id.encode(out);
934 self.record.encode(out);
935 }
936
937 pub fn to_vec(&self) -> Vec<u8> {
939 let mut out = Vec::new();
940 self.encode(&mut out);
941 out
942 }
943
944 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
946 SignedEntry::from_entry(self, namespace, author)
947 }
948}
949
950const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
951const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
952const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
953
954#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
956pub struct RecordIdentifier(Bytes);
957
958impl Default for RecordIdentifier {
959 fn default() -> Self {
960 Self::new(NamespaceId::default(), AuthorId::default(), b"")
961 }
962}
963
964impl Debug for RecordIdentifier {
965 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
966 f.debug_struct("RecordIdentifier")
967 .field("namespace", &self.namespace())
968 .field("author", &self.author())
969 .field("key", &std::string::String::from_utf8_lossy(self.key()))
970 .finish()
971 }
972}
973
974impl RangeKey for RecordIdentifier {
975 #[cfg(test)]
976 fn is_prefix_of(&self, other: &Self) -> bool {
977 other.as_ref().starts_with(self.as_ref())
978 }
979}
980
981fn system_time_now() -> u64 {
982 SystemTime::now()
983 .duration_since(SystemTime::UNIX_EPOCH)
984 .expect("time drift")
985 .as_micros() as u64
986}
987
988impl RecordIdentifier {
989 pub fn new(
991 namespace: impl Into<NamespaceId>,
992 author: impl Into<AuthorId>,
993 key: impl AsRef<[u8]>,
994 ) -> Self {
995 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
996 bytes.extend_from_slice(namespace.into().as_bytes());
997 bytes.extend_from_slice(author.into().as_bytes());
998 bytes.extend_from_slice(key.as_ref());
999 Self(bytes.freeze())
1000 }
1001
1002 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1004 out.extend_from_slice(&self.0);
1005 }
1006
1007 pub fn as_bytes(&self) -> Bytes {
1009 self.0.clone()
1010 }
1011
1012 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1014 (
1015 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1016 self.0[AUTHOR_BYTES].try_into().unwrap(),
1017 &self.0[KEY_BYTES],
1018 )
1019 }
1020
1021 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1023 (
1024 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1025 self.0[AUTHOR_BYTES].try_into().unwrap(),
1026 self.0.slice(KEY_BYTES),
1027 )
1028 }
1029
1030 pub fn key(&self) -> &[u8] {
1032 &self.0[KEY_BYTES]
1033 }
1034
1035 pub fn key_bytes(&self) -> Bytes {
1037 self.0.slice(KEY_BYTES)
1038 }
1039
1040 pub fn namespace(&self) -> NamespaceId {
1042 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1043 value.into()
1044 }
1045
1046 pub fn author(&self) -> AuthorId {
1048 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1049 value.into()
1050 }
1051}
1052
1053impl AsRef<[u8]> for RecordIdentifier {
1054 fn as_ref(&self) -> &[u8] {
1055 &self.0
1056 }
1057}
1058
1059impl Deref for SignedEntry {
1060 type Target = Entry;
1061 fn deref(&self) -> &Self::Target {
1062 &self.entry
1063 }
1064}
1065
1066impl Deref for Entry {
1067 type Target = Record;
1068 fn deref(&self) -> &Self::Target {
1069 &self.record
1070 }
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1075pub struct Record {
1076 len: u64,
1078 hash: Hash,
1080 timestamp: u64,
1082}
1083
1084impl RangeValue for Record {}
1085
1086impl Ord for Record {
1090 fn cmp(&self, other: &Self) -> Ordering {
1091 self.timestamp
1092 .cmp(&other.timestamp)
1093 .then_with(|| self.hash.cmp(&other.hash))
1094 }
1095}
1096
1097impl PartialOrd for Record {
1098 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1099 Some(self.cmp(other))
1100 }
1101}
1102
1103impl Record {
1104 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1106 debug_assert!(
1107 len != 0 || hash == Hash::EMPTY,
1108 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1109 );
1110 Record {
1111 hash,
1112 len,
1113 timestamp,
1114 }
1115 }
1116
1117 pub fn empty(timestamp: u64) -> Self {
1119 Self::new(Hash::EMPTY, 0, timestamp)
1120 }
1121
1122 pub fn empty_current() -> Self {
1124 Self::new_current(Hash::EMPTY, 0)
1125 }
1126
1127 pub fn is_empty(&self) -> bool {
1129 self.hash == Hash::EMPTY
1130 }
1131
1132 pub fn new_current(hash: Hash, len: u64) -> Self {
1134 let timestamp = system_time_now();
1135 Self::new(hash, len, timestamp)
1136 }
1137
1138 pub fn content_len(&self) -> u64 {
1140 self.len
1141 }
1142
1143 pub fn content_hash(&self) -> Hash {
1145 self.hash
1146 }
1147
1148 pub fn timestamp(&self) -> u64 {
1150 self.timestamp
1151 }
1152
1153 #[cfg(test)]
1154 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1155 let len = data.as_ref().len() as u64;
1156 let hash = Hash::new(data);
1157 Self::new_current(hash, len)
1158 }
1159
1160 #[cfg(test)]
1161 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1162 let len = data.as_ref().len() as u64;
1163 let hash = Hash::new(data);
1164 Self::new(hash, len, timestamp)
1165 }
1166
1167 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1169 out.extend_from_slice(&self.len.to_be_bytes());
1170 out.extend_from_slice(self.hash.as_ref());
1171 out.extend_from_slice(&self.timestamp.to_be_bytes())
1172 }
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177 use std::collections::HashSet;
1178
1179 use anyhow::Result;
1180 use rand_core::SeedableRng;
1181
1182 use crate::{
1183 actor::SyncHandle,
1184 ranger::{Range, Store as _},
1185 store::{OpenError, Query, SortBy, SortDirection, Store},
1186 };
1187
1188 use super::*;
1189
1190 #[test]
1191 fn test_basics_memory() -> Result<()> {
1192 let store = store::Store::memory();
1193 test_basics(store)?;
1194
1195 Ok(())
1196 }
1197
1198 #[test]
1199 fn test_basics_fs() -> Result<()> {
1200 let dbfile = tempfile::NamedTempFile::new()?;
1201 let store = store::fs::Store::persistent(dbfile.path())?;
1202 test_basics(store)?;
1203 Ok(())
1204 }
1205
1206 fn test_basics(mut store: Store) -> Result<()> {
1207 let mut rng = rand::thread_rng();
1208 let alice = Author::new(&mut rng);
1209 let bob = Author::new(&mut rng);
1210 let myspace = NamespaceSecret::new(&mut rng);
1211
1212 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1213 let record = Record::current_from_data(b"this is my cool data");
1214 let entry = Entry::new(record_id, record);
1215 let signed_entry = entry.sign(&myspace, &alice);
1216 signed_entry.verify(&()).expect("failed to verify");
1217
1218 let mut my_replica = store.new_replica(myspace.clone())?;
1219 for i in 0..10 {
1220 my_replica.hash_and_insert(
1221 format!("/{i}"),
1222 &alice,
1223 format!("{i}: hello from alice"),
1224 )?;
1225 }
1226
1227 for i in 0..10 {
1228 let res = store
1229 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1230 .unwrap();
1231 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1232 assert_eq!(res.entry().record().content_len(), len);
1233 res.verify(&())?;
1234 }
1235
1236 let mut my_replica = store.new_replica(myspace.clone())?;
1238 my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
1239 let _entry = store
1240 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1241 .unwrap();
1242 let mut my_replica = store.new_replica(myspace.clone())?;
1244 my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
1245 let _entry = store
1246 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1247 .unwrap();
1248
1249 let entries: Vec<_> = store
1251 .get_many(myspace.id(), Query::author(alice.id()))?
1252 .collect::<Result<_>>()?;
1253 assert_eq!(entries.len(), 11);
1254
1255 let entries: Vec<_> = store
1257 .get_many(myspace.id(), Query::author(bob.id()))?
1258 .collect::<Result<_>>()?;
1259 assert_eq!(entries.len(), 0);
1260
1261 let entries: Vec<_> = store
1263 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1264 .collect::<Result<_>>()?;
1265 assert_eq!(entries.len(), 1);
1266
1267 let entries: Vec<_> = store
1269 .get_many(myspace.id(), Query::all())?
1270 .collect::<Result<_>>()?;
1271 assert_eq!(entries.len(), 11);
1272
1273 let mut my_replica = store.new_replica(myspace.clone())?;
1275 let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
1276
1277 let entries: Vec<_> = store
1279 .get_many(myspace.id(), Query::author(alice.id()))?
1280 .collect::<Result<_>>()?;
1281 assert_eq!(entries.len(), 11);
1282
1283 let entries: Vec<_> = store
1284 .get_many(myspace.id(), Query::author(bob.id()))?
1285 .collect::<Result<_>>()?;
1286 assert_eq!(entries.len(), 1);
1287
1288 let entries: Vec<_> = store
1290 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1291 .collect::<Result<_>>()?;
1292 assert_eq!(entries.len(), 2);
1293
1294 let entries: Vec<_> = store
1296 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1297 .collect::<Result<_>>()?;
1298 assert_eq!(entries.len(), 2);
1299
1300 let entries: Vec<_> = store
1302 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1303 .collect::<Result<_>>()?;
1304 assert_eq!(entries.len(), 1);
1305
1306 let entries: Vec<_> = store
1307 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1308 .collect::<Result<_>>()?;
1309 assert_eq!(entries.len(), 1);
1310
1311 let entries: Vec<_> = store
1313 .get_many(myspace.id(), Query::all())?
1314 .collect::<Result<_>>()?;
1315 assert_eq!(entries.len(), 12);
1316
1317 let mut my_replica = store.new_replica(myspace.clone())?;
1319 let entries_second: Vec<_> = my_replica
1320 .store
1321 .get_range(Range::new(
1322 RecordIdentifier::default(),
1323 RecordIdentifier::default(),
1324 ))?
1325 .collect::<Result<_, _>>()?;
1326
1327 assert_eq!(entries_second.len(), 12);
1328 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1329
1330 test_lru_cache_like_behaviour(&mut store, myspace.id())
1331 }
1332
1333 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1336 #[track_caller]
1338 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1339 assert_eq!(
1340 expected_peers,
1341 &store
1342 .get_sync_peers(&namespace)
1343 .unwrap()
1344 .unwrap()
1345 .collect::<Vec<_>>(),
1346 "sync peers differ"
1347 );
1348 }
1349
1350 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1351 let mut expected_peers = Vec::with_capacity(count);
1353 for i in 0..count as u8 {
1354 let peer = [i; 32];
1355 expected_peers.insert(0, peer);
1356 store.register_useful_peer(namespace, peer)?;
1357 }
1358 verify_peers(store, namespace, &expected_peers);
1359
1360 expected_peers.pop();
1362 let newer_peer = [count as u8; 32];
1363 expected_peers.insert(0, newer_peer);
1364 store.register_useful_peer(namespace, newer_peer)?;
1365 verify_peers(store, namespace, &expected_peers);
1366
1367 let refreshed_peer = expected_peers.remove(2);
1369 expected_peers.insert(0, refreshed_peer);
1370 store.register_useful_peer(namespace, refreshed_peer)?;
1371 verify_peers(store, namespace, &expected_peers);
1372 Ok(())
1373 }
1374
1375 #[test]
1376 fn test_content_hashes_iterator_memory() -> Result<()> {
1377 let store = store::Store::memory();
1378 test_content_hashes_iterator(store)
1379 }
1380
1381 #[test]
1382 fn test_content_hashes_iterator_fs() -> Result<()> {
1383 let dbfile = tempfile::NamedTempFile::new()?;
1384 let store = store::fs::Store::persistent(dbfile.path())?;
1385 test_content_hashes_iterator(store)
1386 }
1387
1388 fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1389 let mut rng = rand::thread_rng();
1390 let mut expected = HashSet::new();
1391 let n_replicas = 3;
1392 let n_entries = 4;
1393 for i in 0..n_replicas {
1394 let namespace = NamespaceSecret::new(&mut rng);
1395 let author = store.new_author(&mut rng)?;
1396 let mut replica = store.new_replica(namespace)?;
1397 for j in 0..n_entries {
1398 let key = format!("{j}");
1399 let data = format!("{i}:{j}");
1400 let hash = replica.hash_and_insert(key, &author, data)?;
1401 expected.insert(hash);
1402 }
1403 }
1404 assert_eq!(expected.len(), n_replicas * n_entries);
1405 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1406 assert_eq!(actual, expected);
1407 Ok(())
1408 }
1409
1410 #[test]
1411 fn test_multikey() {
1412 let mut rng = rand::thread_rng();
1413
1414 let k = ["a", "c", "z"];
1415
1416 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1417 n.sort_by_key(|n| n.id());
1418
1419 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1420 a.sort_by_key(|a| a.id());
1421
1422 {
1424 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1425 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1426 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1427
1428 let range = Range::new(ri0.clone(), ri2.clone());
1429 assert!(range.contains(&ri0), "start");
1430 assert!(range.contains(&ri1), "inside");
1431 assert!(!range.contains(&ri2), "end");
1432
1433 assert!(ri0 < ri1);
1434 assert!(ri1 < ri2);
1435 }
1436
1437 {
1439 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1440 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1441 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1442
1443 let range = Range::new(ri0.clone(), ri2.clone());
1444 assert!(range.contains(&ri0), "start");
1445 assert!(range.contains(&ri1), "inside");
1446 assert!(!range.contains(&ri2), "end");
1447
1448 assert!(ri0 < ri1);
1449 assert!(ri1 < ri2);
1450 }
1451
1452 {
1454 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1455 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1456 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1457
1458 let range = Range::new(ri0.clone(), ri2.clone());
1459 assert!(range.contains(&ri0), "start");
1460 assert!(range.contains(&ri1), "inside");
1461 assert!(!range.contains(&ri2), "end");
1462
1463 assert!(ri0 < ri1);
1464 assert!(ri1 < ri2);
1465 }
1466
1467 {
1469 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1470 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1471 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1472
1473 let range = Range::new(ri0.clone(), ri2.clone());
1474 assert!(range.contains(&ri0), "start");
1475 assert!(range.contains(&ri1), "inside");
1476 assert!(!range.contains(&ri2), "end");
1477
1478 assert!(ri0 < ri1);
1479 assert!(ri1 < ri2);
1480 }
1481
1482 {
1484 let a0 = a[0].id();
1487 let a1 = a[1].id();
1488 let n0 = n[0].id();
1489 let n1 = n[1].id();
1490 let k0 = k[0];
1491 let k1 = k[1];
1492
1493 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1494 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1495 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1496 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1497 }
1498 }
1499
1500 #[test]
1501 fn test_timestamps_memory() -> Result<()> {
1502 let store = store::Store::memory();
1503 test_timestamps(store)?;
1504
1505 Ok(())
1506 }
1507
1508 #[test]
1509 fn test_timestamps_fs() -> Result<()> {
1510 let dbfile = tempfile::NamedTempFile::new()?;
1511 let store = store::fs::Store::persistent(dbfile.path())?;
1512 test_timestamps(store)?;
1513 Ok(())
1514 }
1515
1516 fn test_timestamps(mut store: Store) -> Result<()> {
1517 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1518 let namespace = NamespaceSecret::new(&mut rng);
1519 let _replica = store.new_replica(namespace.clone())?;
1520 let author = store.new_author(&mut rng)?;
1521 store.close_replica(namespace.id());
1522 let mut replica = store.open_replica(&namespace.id())?;
1523
1524 let key = b"hello";
1525 let value = b"world";
1526 let entry = {
1527 let timestamp = 2;
1528 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1529 let record = Record::from_data(value, timestamp);
1530 Entry::new(id, record).sign(&namespace, &author)
1531 };
1532
1533 replica
1534 .insert_entry(entry.clone(), InsertOrigin::Local)
1535 .unwrap();
1536 store.close_replica(namespace.id());
1537 let res = store
1538 .get_exact(namespace.id(), author.id(), key, false)?
1539 .unwrap();
1540 assert_eq!(res, entry);
1541
1542 let entry2 = {
1543 let timestamp = 1;
1544 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1545 let record = Record::from_data(value, timestamp);
1546 Entry::new(id, record).sign(&namespace, &author)
1547 };
1548
1549 let mut replica = store.open_replica(&namespace.id())?;
1550 let res = replica.insert_entry(entry2, InsertOrigin::Local);
1551 store.close_replica(namespace.id());
1552 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1553 let res = store
1554 .get_exact(namespace.id(), author.id(), key, false)?
1555 .unwrap();
1556 assert_eq!(res, entry);
1557
1558 Ok(())
1559 }
1560
1561 #[test]
1562 fn test_replica_sync_memory() -> Result<()> {
1563 let alice_store = store::Store::memory();
1564 let bob_store = store::Store::memory();
1565
1566 test_replica_sync(alice_store, bob_store)?;
1567 Ok(())
1568 }
1569
1570 #[test]
1571 fn test_replica_sync_fs() -> Result<()> {
1572 let alice_dbfile = tempfile::NamedTempFile::new()?;
1573 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1574 let bob_dbfile = tempfile::NamedTempFile::new()?;
1575 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1576 test_replica_sync(alice_store, bob_store)?;
1577
1578 Ok(())
1579 }
1580
1581 fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1582 let alice_set = ["ape", "eel", "fox", "gnu"];
1583 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1584
1585 let mut rng = rand::thread_rng();
1586 let author = Author::new(&mut rng);
1587 let myspace = NamespaceSecret::new(&mut rng);
1588 let mut alice = alice_store.new_replica(myspace.clone())?;
1589 for el in &alice_set {
1590 alice.hash_and_insert(el, &author, el.as_bytes())?;
1591 }
1592
1593 let mut bob = bob_store.new_replica(myspace.clone())?;
1594 for el in &bob_set {
1595 bob.hash_and_insert(el, &author, el.as_bytes())?;
1596 }
1597
1598 let (alice_out, bob_out) = sync(&mut alice, &mut bob)?;
1599
1600 assert_eq!(alice_out.num_sent, 2);
1601 assert_eq!(bob_out.num_recv, 2);
1602 assert_eq!(alice_out.num_recv, 6);
1603 assert_eq!(bob_out.num_sent, 6);
1604
1605 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1606 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1607 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1608 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1609
1610 Ok(())
1611 }
1612
1613 #[test]
1614 fn test_replica_timestamp_sync_memory() -> Result<()> {
1615 let alice_store = store::Store::memory();
1616 let bob_store = store::Store::memory();
1617
1618 test_replica_timestamp_sync(alice_store, bob_store)?;
1619 Ok(())
1620 }
1621
1622 #[test]
1623 fn test_replica_timestamp_sync_fs() -> Result<()> {
1624 let alice_dbfile = tempfile::NamedTempFile::new()?;
1625 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1626 let bob_dbfile = tempfile::NamedTempFile::new()?;
1627 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1628 test_replica_timestamp_sync(alice_store, bob_store)?;
1629
1630 Ok(())
1631 }
1632
1633 fn test_replica_timestamp_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1634 let mut rng = rand::thread_rng();
1635 let author = Author::new(&mut rng);
1636 let namespace = NamespaceSecret::new(&mut rng);
1637 let mut alice = alice_store.new_replica(namespace.clone())?;
1638 let mut bob = bob_store.new_replica(namespace.clone())?;
1639
1640 let key = b"key";
1641 let alice_value = b"alice";
1642 let bob_value = b"bob";
1643 let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
1644 let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
1646 sync(&mut alice, &mut bob)?;
1647 assert_eq!(
1648 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1649 Some(bob_hash)
1650 );
1651 assert_eq!(
1652 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1653 Some(bob_hash)
1654 );
1655
1656 let mut alice = alice_store.new_replica(namespace.clone())?;
1657 let mut bob = bob_store.new_replica(namespace.clone())?;
1658
1659 let alice_value_2 = b"alice2";
1660 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
1662 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
1663 sync(&mut alice, &mut bob)?;
1664 assert_eq!(
1665 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1666 Some(alice_hash_2)
1667 );
1668 assert_eq!(
1669 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1670 Some(alice_hash_2)
1671 );
1672
1673 Ok(())
1674 }
1675
1676 #[test]
1677 fn test_future_timestamp() -> Result<()> {
1678 let mut rng = rand::thread_rng();
1679 let mut store = store::Store::memory();
1680 let author = Author::new(&mut rng);
1681 let namespace = NamespaceSecret::new(&mut rng);
1682
1683 let mut replica = store.new_replica(namespace.clone())?;
1684 let key = b"hi";
1685 let t = system_time_now();
1686 let record = Record::from_data(b"1", t);
1687 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1688 replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
1689
1690 assert_eq!(
1691 get_entry(&mut store, namespace.id(), author.id(), key)?,
1692 entry0
1693 );
1694
1695 let mut replica = store.new_replica(namespace.clone())?;
1696 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1697 let record = Record::from_data(b"2", t);
1698 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1699 replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
1700 assert_eq!(
1701 get_entry(&mut store, namespace.id(), author.id(), key)?,
1702 entry1
1703 );
1704
1705 let mut replica = store.new_replica(namespace.clone())?;
1706 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1707 let record = Record::from_data(b"2", t);
1708 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1709 replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
1710 assert_eq!(
1711 get_entry(&mut store, namespace.id(), author.id(), key)?,
1712 entry2
1713 );
1714
1715 let mut replica = store.new_replica(namespace.clone())?;
1716 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1717 let record = Record::from_data(b"2", t);
1718 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1719 let res = replica.insert_entry(entry3, InsertOrigin::Local);
1720 assert!(matches!(
1721 res,
1722 Err(InsertError::Validation(
1723 ValidationFailure::TooFarInTheFuture
1724 ))
1725 ));
1726 assert_eq!(
1727 get_entry(&mut store, namespace.id(), author.id(), key)?,
1728 entry2
1729 );
1730
1731 Ok(())
1732 }
1733
1734 #[test]
1735 fn test_insert_empty() -> Result<()> {
1736 let mut store = store::Store::memory();
1737 let mut rng = rand::thread_rng();
1738 let alice = Author::new(&mut rng);
1739 let myspace = NamespaceSecret::new(&mut rng);
1740 let mut replica = store.new_replica(myspace.clone())?;
1741 let hash = Hash::new(b"");
1742 let res = replica.insert(b"foo", &alice, hash, 0);
1743 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1744 Ok(())
1745 }
1746
1747 #[test]
1748 fn test_prefix_delete_memory() -> Result<()> {
1749 let store = store::Store::memory();
1750 test_prefix_delete(store)?;
1751 Ok(())
1752 }
1753
1754 #[test]
1755 fn test_prefix_delete_fs() -> Result<()> {
1756 let dbfile = tempfile::NamedTempFile::new()?;
1757 let store = store::fs::Store::persistent(dbfile.path())?;
1758 test_prefix_delete(store)?;
1759 Ok(())
1760 }
1761
1762 fn test_prefix_delete(mut store: Store) -> Result<()> {
1763 let mut rng = rand::thread_rng();
1764 let alice = Author::new(&mut rng);
1765 let myspace = NamespaceSecret::new(&mut rng);
1766 let mut replica = store.new_replica(myspace.clone())?;
1767 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
1768 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
1769
1770 assert_eq!(
1772 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1773 Some(hash1)
1774 );
1775 assert_eq!(
1776 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1777 Some(hash2)
1778 );
1779
1780 let mut replica = store.new_replica(myspace.clone())?;
1782 let deleted = replica.delete_prefix(b"foo", &alice)?;
1783 assert_eq!(deleted, 2);
1784 assert_eq!(
1785 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1786 None
1787 );
1788 assert_eq!(
1789 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1790 None
1791 );
1792 assert_eq!(
1793 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1794 None
1795 );
1796
1797 Ok(())
1798 }
1799
1800 #[test]
1801 fn test_replica_sync_delete_memory() -> Result<()> {
1802 let alice_store = store::Store::memory();
1803 let bob_store = store::Store::memory();
1804
1805 test_replica_sync_delete(alice_store, bob_store)
1806 }
1807
1808 #[test]
1809 fn test_replica_sync_delete_fs() -> Result<()> {
1810 let alice_dbfile = tempfile::NamedTempFile::new()?;
1811 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1812 let bob_dbfile = tempfile::NamedTempFile::new()?;
1813 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1814 test_replica_sync_delete(alice_store, bob_store)
1815 }
1816
1817 fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1818 let alice_set = ["foot"];
1819 let bob_set = ["fool", "foo", "fog"];
1820
1821 let mut rng = rand::thread_rng();
1822 let author = Author::new(&mut rng);
1823 let myspace = NamespaceSecret::new(&mut rng);
1824 let mut alice = alice_store.new_replica(myspace.clone())?;
1825 for el in &alice_set {
1826 alice.hash_and_insert(el, &author, el.as_bytes())?;
1827 }
1828
1829 let mut bob = bob_store.new_replica(myspace.clone())?;
1830 for el in &bob_set {
1831 bob.hash_and_insert(el, &author, el.as_bytes())?;
1832 }
1833
1834 sync(&mut alice, &mut bob)?;
1835
1836 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1837 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1838 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1839 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1840
1841 let mut alice = alice_store.new_replica(myspace.clone())?;
1842 let mut bob = bob_store.new_replica(myspace.clone())?;
1843 alice.delete_prefix("foo", &author)?;
1844 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
1845 sync(&mut alice, &mut bob)?;
1846 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1847 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1848
1849 Ok(())
1850 }
1851
1852 #[test]
1853 fn test_replica_remove_memory() -> Result<()> {
1854 let alice_store = store::Store::memory();
1855 test_replica_remove(alice_store)
1856 }
1857
1858 #[test]
1859 fn test_replica_remove_fs() -> Result<()> {
1860 let alice_dbfile = tempfile::NamedTempFile::new()?;
1861 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1862 test_replica_remove(alice_store)
1863 }
1864
1865 fn test_replica_remove(mut store: Store) -> Result<()> {
1866 let mut rng = rand::thread_rng();
1867 let namespace = NamespaceSecret::new(&mut rng);
1868 let author = Author::new(&mut rng);
1869 let mut replica = store.new_replica(namespace.clone())?;
1870
1871 let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
1873 let res = store
1874 .get_many(namespace.id(), Query::all())?
1875 .collect::<Vec<_>>();
1876 assert_eq!(res.len(), 1);
1877
1878 let res = store.remove_replica(&namespace.id());
1880 assert!(res.is_err());
1882 store.close_replica(namespace.id());
1883 store.remove_replica(&namespace.id())?;
1884 let res = store
1885 .get_many(namespace.id(), Query::all())?
1886 .collect::<Vec<_>>();
1887 assert_eq!(res.len(), 0);
1888
1889 let res = store.load_replica_info(&namespace.id());
1891 assert!(matches!(res, Err(OpenError::NotFound)));
1892
1893 let mut replica = store.new_replica(namespace.clone())?;
1895 replica.insert(b"foo", &author, hash, 3)?;
1896 let res = store
1897 .get_many(namespace.id(), Query::all())?
1898 .collect::<Vec<_>>();
1899 assert_eq!(res.len(), 1);
1900 Ok(())
1901 }
1902
1903 #[test]
1904 fn test_replica_delete_edge_cases_memory() -> Result<()> {
1905 let store = store::Store::memory();
1906 test_replica_delete_edge_cases(store)
1907 }
1908
1909 #[test]
1910 fn test_replica_delete_edge_cases_fs() -> Result<()> {
1911 let dbfile = tempfile::NamedTempFile::new()?;
1912 let store = store::fs::Store::persistent(dbfile.path())?;
1913 test_replica_delete_edge_cases(store)
1914 }
1915
1916 fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1917 let mut rng = rand::thread_rng();
1918 let author = Author::new(&mut rng);
1919 let namespace = NamespaceSecret::new(&mut rng);
1920
1921 let edgecases = [0u8, 1u8, 255u8];
1922 let prefixes = [0u8, 255u8];
1923 let hash = Hash::new(b"foo");
1924 let len = 3;
1925 for prefix in prefixes {
1926 let mut expected = vec![];
1927 let mut replica = store.new_replica(namespace.clone())?;
1928 for suffix in edgecases {
1929 let key = [prefix, suffix].to_vec();
1930 expected.push(key.clone());
1931 replica.insert(&key, &author, hash, len)?;
1932 }
1933 assert_keys(&mut store, namespace.id(), expected);
1934 let mut replica = store.new_replica(namespace.clone())?;
1935 replica.delete_prefix([prefix], &author)?;
1936 assert_keys(&mut store, namespace.id(), vec![]);
1937 }
1938
1939 let mut replica = store.new_replica(namespace.clone())?;
1940 let key = vec![1u8, 0u8];
1941 replica.insert(key, &author, hash, len)?;
1942 let key = vec![1u8, 1u8];
1943 replica.insert(key, &author, hash, len)?;
1944 let key = vec![1u8, 2u8];
1945 replica.insert(key, &author, hash, len)?;
1946 let prefix = vec![1u8, 1u8];
1947 replica.delete_prefix(prefix, &author)?;
1948 assert_keys(
1949 &mut store,
1950 namespace.id(),
1951 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1952 );
1953
1954 let mut replica = store.new_replica(namespace.clone())?;
1955 let key = vec![0u8, 255u8];
1956 replica.insert(key, &author, hash, len)?;
1957 let key = vec![0u8, 0u8];
1958 replica.insert(key, &author, hash, len)?;
1959 let prefix = vec![0u8];
1960 replica.delete_prefix(prefix, &author)?;
1961 assert_keys(
1962 &mut store,
1963 namespace.id(),
1964 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1965 );
1966 Ok(())
1967 }
1968
1969 #[test]
1970 fn test_latest_iter_memory() -> Result<()> {
1971 let store = store::Store::memory();
1972 test_latest_iter(store)
1973 }
1974
1975 #[test]
1976 fn test_latest_iter_fs() -> Result<()> {
1977 let dbfile = tempfile::NamedTempFile::new()?;
1978 let store = store::fs::Store::persistent(dbfile.path())?;
1979 test_latest_iter(store)
1980 }
1981
1982 fn test_latest_iter(mut store: Store) -> Result<()> {
1983 let mut rng = rand::thread_rng();
1984 let author0 = Author::new(&mut rng);
1985 let author1 = Author::new(&mut rng);
1986 let namespace = NamespaceSecret::new(&mut rng);
1987 let mut replica = store.new_replica(namespace.clone())?;
1988
1989 replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
1990 let latest = store
1991 .get_latest_for_each_author(namespace.id())?
1992 .collect::<Result<Vec<_>>>()?;
1993 assert_eq!(latest.len(), 1);
1994 assert_eq!(latest[0].2, b"a0.1".to_vec());
1995
1996 let mut replica = store.new_replica(namespace.clone())?;
1997 replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
1998 replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
1999 let latest = store
2000 .get_latest_for_each_author(namespace.id())?
2001 .collect::<Result<Vec<_>>>()?;
2002 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2003 latest_keys.sort();
2004 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2005
2006 Ok(())
2007 }
2008
2009 #[test]
2010 fn test_replica_byte_keys_memory() -> Result<()> {
2011 let store = store::Store::memory();
2012
2013 test_replica_byte_keys(store)?;
2014 Ok(())
2015 }
2016
2017 #[test]
2018 fn test_replica_byte_keys_fs() -> Result<()> {
2019 let dbfile = tempfile::NamedTempFile::new()?;
2020 let store = store::fs::Store::persistent(dbfile.path())?;
2021 test_replica_byte_keys(store)?;
2022
2023 Ok(())
2024 }
2025
2026 fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2027 let mut rng = rand::thread_rng();
2028 let author = Author::new(&mut rng);
2029 let namespace = NamespaceSecret::new(&mut rng);
2030
2031 let hash = Hash::new(b"foo");
2032 let len = 3;
2033
2034 let key = vec![1u8, 0u8];
2035 let mut replica = store.new_replica(namespace.clone())?;
2036 replica.insert(key, &author, hash, len)?;
2037 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2038 let key = vec![1u8, 2u8];
2039 let mut replica = store.new_replica(namespace.clone())?;
2040 replica.insert(key, &author, hash, len)?;
2041 assert_keys(
2042 &mut store,
2043 namespace.id(),
2044 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2045 );
2046
2047 let key = vec![0u8, 255u8];
2048 let mut replica = store.new_replica(namespace.clone())?;
2049 replica.insert(key, &author, hash, len)?;
2050 assert_keys(
2051 &mut store,
2052 namespace.id(),
2053 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2054 );
2055 Ok(())
2056 }
2057
2058 #[test]
2059 fn test_replica_capability_memory() -> Result<()> {
2060 let store = store::Store::memory();
2061 test_replica_capability(store)
2062 }
2063
2064 #[test]
2065 fn test_replica_capability_fs() -> Result<()> {
2066 let dbfile = tempfile::NamedTempFile::new()?;
2067 let store = store::fs::Store::persistent(dbfile.path())?;
2068 test_replica_capability(store)
2069 }
2070
2071 #[allow(clippy::redundant_pattern_matching)]
2072 fn test_replica_capability(mut store: Store) -> Result<()> {
2073 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2074 let author = store.new_author(&mut rng)?;
2075 let namespace = NamespaceSecret::new(&mut rng);
2076
2077 let capability = Capability::Read(namespace.id());
2079 store.import_namespace(capability)?;
2080 let mut replica = store.open_replica(&namespace.id())?;
2081 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2082 assert!(matches!(res, Err(InsertError::ReadOnly)));
2083
2084 let capability = Capability::Write(namespace.clone());
2086 store.import_namespace(capability)?;
2087 let mut replica = store.open_replica(&namespace.id())?;
2088 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2089 assert!(matches!(res, Ok(_)));
2090 store.close_replica(namespace.id());
2091 let mut replica = store.open_replica(&namespace.id())?;
2092 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2093 assert!(res.is_ok());
2094
2095 let capability = Capability::Read(namespace.id());
2097 store.import_namespace(capability)?;
2098 store.close_replica(namespace.id());
2099 let mut replica = store.open_replica(&namespace.id())?;
2100 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2101 assert!(res.is_ok());
2102 Ok(())
2103 }
2104
2105 #[tokio::test]
2106 async fn test_actor_capability_memory() -> Result<()> {
2107 let store = store::Store::memory();
2108 test_actor_capability(store).await
2109 }
2110
2111 #[tokio::test]
2112 async fn test_actor_capability_fs() -> Result<()> {
2113 let dbfile = tempfile::NamedTempFile::new()?;
2114 let store = store::fs::Store::persistent(dbfile.path())?;
2115 test_actor_capability(store).await
2116 }
2117
2118 async fn test_actor_capability(store: Store) -> Result<()> {
2119 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2121 let author = Author::new(&mut rng);
2122 let handle = SyncHandle::spawn(store, None, "test".into());
2123 let author = handle.import_author(author).await?;
2124 let namespace = NamespaceSecret::new(&mut rng);
2125 let id = namespace.id();
2126
2127 let capability = Capability::Read(namespace.id());
2129 handle.import_namespace(capability).await?;
2130 handle.open(namespace.id(), Default::default()).await?;
2131 let res = handle
2132 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2133 .await;
2134 assert!(res.is_err());
2135
2136 let capability = Capability::Write(namespace.clone());
2138 handle.import_namespace(capability).await?;
2139 let res = handle
2140 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2141 .await;
2142 assert!(res.is_ok());
2143
2144 handle.close(namespace.id()).await?;
2146 let res = handle
2147 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2148 .await;
2149 assert!(res.is_err());
2150 handle.open(namespace.id(), Default::default()).await?;
2151 let res = handle
2152 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2153 .await;
2154 assert!(res.is_ok());
2155 Ok(())
2156 }
2157
2158 #[test]
2161 fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2162 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2163 let mut store1 = store::Store::memory();
2164 let mut store2 = store::Store::memory();
2165 let peer1 = [1u8; 32];
2166 let peer2 = [2u8; 32];
2167 let mut state1 = SyncOutcome::default();
2168 let mut state2 = SyncOutcome::default();
2169
2170 let author = Author::new(&mut rng);
2171 let namespace = NamespaceSecret::new(&mut rng);
2172 let mut replica1 = store1.new_replica(namespace.clone())?;
2173 let mut replica2 = store2.new_replica(namespace.clone())?;
2174
2175 let (events1_sender, events1) = flume::bounded(32);
2176 let (events2_sender, events2) = flume::bounded(32);
2177
2178 replica1.info.subscribe(events1_sender);
2179 replica2.info.subscribe(events2_sender);
2180
2181 replica1.hash_and_insert(b"foo", &author, b"init")?;
2182
2183 let from1 = replica1.sync_initial_message()?;
2184 let from2 = replica2
2185 .sync_process_message(from1, peer1, &mut state2)
2186 .unwrap()
2187 .unwrap();
2188 let from1 = replica1
2189 .sync_process_message(from2, peer2, &mut state1)
2190 .unwrap()
2191 .unwrap();
2192 replica2.hash_and_insert(b"foo", &author, b"update")?;
2196 let from2 = replica2
2197 .sync_process_message(from1, peer1, &mut state2)
2198 .unwrap();
2199 assert!(from2.is_none());
2200 let events1 = events1.drain().collect::<Vec<_>>();
2201 let events2 = events2.drain().collect::<Vec<_>>();
2202 assert_eq!(events1.len(), 1);
2203 assert_eq!(events2.len(), 1);
2204 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2205 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2206 assert_eq!(state1.num_sent, 1);
2207 assert_eq!(state1.num_recv, 0);
2208 assert_eq!(state2.num_sent, 0);
2209 assert_eq!(state2.num_recv, 1);
2210
2211 Ok(())
2212 }
2213
2214 #[test]
2215 fn test_replica_queries_mem() -> Result<()> {
2216 let store = store::Store::memory();
2217
2218 test_replica_queries(store)?;
2219 Ok(())
2220 }
2221
2222 #[test]
2223 fn test_replica_queries_fs() -> Result<()> {
2224 let dbfile = tempfile::NamedTempFile::new()?;
2225 let store = store::fs::Store::persistent(dbfile.path())?;
2226 test_replica_queries(store)?;
2227
2228 Ok(())
2229 }
2230
2231 fn test_replica_queries(mut store: Store) -> Result<()> {
2232 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2233 let namespace = NamespaceSecret::new(&mut rng);
2234 let namespace_id = namespace.id();
2235
2236 let a1 = store.new_author(&mut rng)?;
2237 let a2 = store.new_author(&mut rng)?;
2238 let a3 = store.new_author(&mut rng)?;
2239 println!(
2240 "a1 {} a2 {} a3 {}",
2241 a1.id().fmt_short(),
2242 a2.id().fmt_short(),
2243 a3.id().fmt_short()
2244 );
2245
2246 let mut replica = store.new_replica(namespace.clone())?;
2247 replica.hash_and_insert("hi/world", &a2, "a2")?;
2248 replica.hash_and_insert("hi/world", &a1, "a1")?;
2249 replica.hash_and_insert("hi/moon", &a2, "a1")?;
2250 replica.hash_and_insert("hi", &a3, "a3")?;
2251
2252 struct QueryTester<'a> {
2253 store: &'a mut Store,
2254 namespace: NamespaceId,
2255 }
2256 impl<'a> QueryTester<'a> {
2257 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2258 let query = query.into();
2259 let actual = self
2260 .store
2261 .get_many(self.namespace, query.clone())
2262 .unwrap()
2263 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2264 .collect::<Result<Vec<_>>>()
2265 .unwrap();
2266 let expected = expected
2267 .into_iter()
2268 .map(|(key, author)| (key.to_string(), author.id()))
2269 .collect::<Vec<_>>();
2270 assert_eq!(actual, expected, "query: {query:#?}")
2271 }
2272 }
2273
2274 let mut qt = QueryTester {
2275 store: &mut store,
2276 namespace: namespace_id,
2277 };
2278
2279 qt.assert(
2280 Query::all(),
2281 vec![
2282 ("hi/world", &a1),
2283 ("hi/moon", &a2),
2284 ("hi/world", &a2),
2285 ("hi", &a3),
2286 ],
2287 );
2288
2289 qt.assert(
2290 Query::single_latest_per_key(),
2291 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2292 );
2293
2294 qt.assert(
2295 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2296 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2297 );
2298
2299 qt.assert(
2300 Query::single_latest_per_key().key_prefix("hi/"),
2301 vec![("hi/moon", &a2), ("hi/world", &a1)],
2302 );
2303
2304 qt.assert(
2305 Query::single_latest_per_key()
2306 .key_prefix("hi/")
2307 .sort_direction(SortDirection::Desc),
2308 vec![("hi/world", &a1), ("hi/moon", &a2)],
2309 );
2310
2311 qt.assert(
2312 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2313 vec![
2314 ("hi", &a3),
2315 ("hi/moon", &a2),
2316 ("hi/world", &a1),
2317 ("hi/world", &a2),
2318 ],
2319 );
2320
2321 qt.assert(
2322 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2323 vec![
2324 ("hi/world", &a2),
2325 ("hi/world", &a1),
2326 ("hi/moon", &a2),
2327 ("hi", &a3),
2328 ],
2329 );
2330
2331 qt.assert(
2332 Query::all().key_prefix("hi/"),
2333 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2334 );
2335
2336 qt.assert(
2337 Query::all().key_prefix("hi/").offset(1).limit(1),
2338 vec![("hi/moon", &a2)],
2339 );
2340
2341 qt.assert(
2342 Query::all()
2343 .key_prefix("hi/")
2344 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2345 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2346 );
2347
2348 qt.assert(
2349 Query::all()
2350 .key_prefix("hi/")
2351 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2352 .offset(1)
2353 .limit(1),
2354 vec![("hi/world", &a1)],
2355 );
2356
2357 qt.assert(
2358 Query::all()
2359 .key_prefix("hi/")
2360 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2361 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2362 );
2363
2364 qt.assert(
2365 Query::all()
2366 .key_prefix("hi/")
2367 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2368 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2369 );
2370
2371 qt.assert(
2372 Query::all()
2373 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2374 .limit(2)
2375 .offset(1),
2376 vec![("hi/moon", &a2), ("hi/world", &a1)],
2377 );
2378
2379 let mut replica = store.new_replica(namespace)?;
2380 replica.delete_prefix("hi/world", &a2)?;
2381 let mut qt = QueryTester {
2382 store: &mut store,
2383 namespace: namespace_id,
2384 };
2385
2386 qt.assert(
2387 Query::all(),
2388 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2389 );
2390
2391 qt.assert(
2392 Query::all().include_empty(),
2393 vec![
2394 ("hi/world", &a1),
2395 ("hi/moon", &a2),
2396 ("hi/world", &a2),
2397 ("hi", &a3),
2398 ],
2399 );
2400
2401 Ok(())
2402 }
2403
2404 #[test]
2405 fn test_dl_policies_mem() -> Result<()> {
2406 let mut store = store::Store::memory();
2407 test_dl_policies(&mut store)
2408 }
2409
2410 #[test]
2411 fn test_dl_policies_fs() -> Result<()> {
2412 let dbfile = tempfile::NamedTempFile::new()?;
2413 let mut store = store::fs::Store::persistent(dbfile.path())?;
2414 test_dl_policies(&mut store)
2415 }
2416
2417 fn test_dl_policies(store: &mut Store) -> Result<()> {
2418 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2419 let namespace = NamespaceSecret::new(&mut rng);
2420 let id = namespace.id();
2421
2422 let filter = store::FilterKind::Exact("foo".into());
2423 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2424 store
2425 .set_download_policy(&id, policy.clone())
2426 .expect_err("document dos not exist");
2427
2428 store.new_replica(namespace)?;
2430
2431 store.set_download_policy(&id, policy.clone())?;
2432 let retrieved_policy = store.get_download_policy(&id)?;
2433 assert_eq!(retrieved_policy, policy);
2434 Ok(())
2435 }
2436
2437 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2438 expected.sort();
2439 assert_eq!(expected, get_keys_sorted(store, namespace));
2440 }
2441
2442 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2443 let mut res = store
2444 .get_many(namespace, Query::all())
2445 .unwrap()
2446 .map(|e| e.map(|e| e.key().to_vec()))
2447 .collect::<Result<Vec<_>>>()
2448 .unwrap();
2449 res.sort();
2450 res
2451 }
2452
2453 fn get_entry(
2454 store: &mut Store,
2455 namespace: NamespaceId,
2456 author: AuthorId,
2457 key: &[u8],
2458 ) -> anyhow::Result<SignedEntry> {
2459 let entry = store
2460 .get_exact(namespace, author, key, true)?
2461 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2462 Ok(entry)
2463 }
2464
2465 fn get_content_hash(
2466 store: &mut Store,
2467 namespace: NamespaceId,
2468 author: AuthorId,
2469 key: &[u8],
2470 ) -> anyhow::Result<Option<Hash>> {
2471 let hash = store
2472 .get_exact(namespace, author, key, false)?
2473 .map(|e| e.content_hash());
2474 Ok(hash)
2475 }
2476
2477 fn sync(alice: &mut Replica, bob: &mut Replica) -> Result<(SyncOutcome, SyncOutcome)> {
2478 let alice_peer_id = [1u8; 32];
2479 let bob_peer_id = [2u8; 32];
2480 let mut alice_state = SyncOutcome::default();
2481 let mut bob_state = SyncOutcome::default();
2482 let mut next_to_bob = Some(alice.sync_initial_message()?);
2484 let mut rounds = 0;
2485 while let Some(msg) = next_to_bob.take() {
2486 assert!(rounds < 100, "too many rounds");
2487 rounds += 1;
2488 println!("round {}", rounds);
2489 if let Some(msg) = bob.sync_process_message(msg, alice_peer_id, &mut bob_state)? {
2490 next_to_bob = alice.sync_process_message(msg, bob_peer_id, &mut alice_state)?
2491 }
2492 }
2493 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2494 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2495 Ok((alice_state, bob_state))
2496 }
2497
2498 fn check_entries(
2499 store: &mut Store,
2500 namespace: &NamespaceId,
2501 author: &Author,
2502 set: &[&str],
2503 ) -> Result<()> {
2504 for el in set {
2505 store.get_exact(*namespace, author.id(), el, false)?;
2506 }
2507 Ok(())
2508 }
2509}