1use std::{
10 cmp::Ordering,
11 fmt::Debug,
12 ops::{Deref, DerefMut},
13 sync::Arc,
14 time::{Duration, SystemTime},
15};
16
17use bytes::{Bytes, BytesMut};
18use ed25519_dalek::{Signature, SignatureError};
19use iroh_blobs::Hash;
20use serde::{Deserialize, Serialize};
21
22pub use crate::heads::AuthorHeads;
23use crate::{
24 keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
25 ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
26 store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
27};
28
29pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
33
34pub type PeerIdBytes = [u8; 32];
37
38pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
41
42pub type ContentStatusCallback = Arc<dyn Fn(Hash) -> ContentStatus + Send + Sync + 'static>;
44
45#[derive(Debug, Clone)]
47pub enum Event {
48 LocalInsert {
50 namespace: NamespaceId,
52 entry: SignedEntry,
54 },
55 RemoteInsert {
57 namespace: NamespaceId,
59 entry: SignedEntry,
61 from: PeerIdBytes,
63 should_download: bool,
65 remote_content_status: ContentStatus,
67 },
68}
69
70#[derive(Debug, Clone)]
72pub enum InsertOrigin {
73 Local,
75 Sync {
77 from: PeerIdBytes,
79 remote_content_status: ContentStatus,
81 },
82}
83
84#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
86pub enum ContentStatus {
87 Complete,
89 Incomplete,
91 Missing,
93}
94
95#[derive(Debug, Clone, Default)]
97pub struct SyncOutcome {
98 pub heads_received: AuthorHeads,
100 pub num_recv: usize,
102 pub num_sent: usize,
104}
105
106fn get_as_ptr<T>(value: &T) -> Option<usize> {
107 use std::mem;
108 if mem::size_of::<T>() == std::mem::size_of::<usize>()
109 && mem::align_of::<T>() == mem::align_of::<usize>()
110 {
111 unsafe { Some(mem::transmute_copy(value)) }
113 } else {
114 None
115 }
116}
117
118fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
119 get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
120}
121
122#[derive(Debug, Default)]
123struct Subscribers(Vec<async_channel::Sender<Event>>);
124impl Subscribers {
125 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
126 self.0.push(sender)
127 }
128 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
129 self.0.retain(|s| !same_channel(s, sender));
130 }
131 pub fn send(&mut self, event: Event) {
132 self.0
133 .retain(|sender| sender.send_blocking(event.clone()).is_ok())
134 }
135 pub fn len(&self) -> usize {
136 self.0.len()
137 }
138 pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
139 if !self.0.is_empty() {
140 self.send(f())
141 }
142 }
143}
144
145#[derive(
147 Debug,
148 Clone,
149 Copy,
150 Serialize,
151 Deserialize,
152 num_enum::IntoPrimitive,
153 num_enum::TryFromPrimitive,
154 strum::Display,
155)]
156#[repr(u8)]
157#[strum(serialize_all = "snake_case")]
158pub enum CapabilityKind {
159 Write = 1,
161 Read = 2,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
167pub enum Capability {
168 Write(NamespaceSecret),
170 Read(NamespaceId),
172}
173
174impl Capability {
175 pub fn id(&self) -> NamespaceId {
177 match self {
178 Capability::Write(secret) => secret.id(),
179 Capability::Read(id) => *id,
180 }
181 }
182
183 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
186 match self {
187 Capability::Write(secret) => Ok(secret),
188 Capability::Read(_) => Err(ReadOnly),
189 }
190 }
191
192 pub fn kind(&self) -> CapabilityKind {
194 match self {
195 Capability::Write(_) => CapabilityKind::Write,
196 Capability::Read(_) => CapabilityKind::Read,
197 }
198 }
199
200 pub fn raw(&self) -> (u8, [u8; 32]) {
202 let capability_repr: u8 = self.kind().into();
203 let bytes = match self {
204 Capability::Write(secret) => secret.to_bytes(),
205 Capability::Read(id) => id.to_bytes(),
206 };
207 (capability_repr, bytes)
208 }
209
210 pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
212 let kind: CapabilityKind = kind.try_into()?;
213 let capability = match kind {
214 CapabilityKind::Write => {
215 let secret = NamespaceSecret::from_bytes(bytes);
216 Capability::Write(secret)
217 }
218 CapabilityKind::Read => {
219 let id = NamespaceId::from(bytes);
220 Capability::Read(id)
221 }
222 };
223 Ok(capability)
224 }
225
226 pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
232 if other.id() != self.id() {
233 return Err(CapabilityError::NamespaceMismatch);
234 }
235
236 if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
238 let _ = std::mem::replace(self, other);
239 Ok(true)
240 } else {
241 Ok(false)
242 }
243 }
244}
245
246#[derive(Debug, thiserror::Error)]
248pub enum CapabilityError {
249 #[error("Namespaces are not the same")]
251 NamespaceMismatch,
252}
253
254#[derive(derive_more::Debug)]
256pub struct ReplicaInfo {
257 pub(crate) capability: Capability,
258 subscribers: Subscribers,
259 #[debug("ContentStatusCallback")]
260 content_status_cb: Option<ContentStatusCallback>,
261 closed: bool,
262}
263
264impl ReplicaInfo {
265 pub fn new(capability: Capability) -> Self {
267 Self {
268 capability,
269 subscribers: Default::default(),
270 content_status_cb: None,
272 closed: false,
273 }
274 }
275
276 pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
282 self.subscribers.subscribe(sender)
283 }
284
285 pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
291 self.subscribers.unsubscribe(sender)
292 }
293
294 pub fn subscribers_count(&self) -> usize {
296 self.subscribers.len()
297 }
298
299 pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
304 if self.content_status_cb.is_some() {
305 false
306 } else {
307 self.content_status_cb = Some(cb);
308 true
309 }
310 }
311
312 fn ensure_open(&self) -> Result<(), InsertError> {
313 if self.closed() {
314 Err(InsertError::Closed)
315 } else {
316 Ok(())
317 }
318 }
319
320 pub fn closed(&self) -> bool {
326 self.closed
327 }
328
329 pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
336 self.capability.merge(capability)
337 }
338}
339
340#[derive(derive_more::Debug)]
342pub struct Replica<'a, I = Box<ReplicaInfo>> {
343 pub(crate) store: StoreInstance<'a>,
344 pub(crate) info: I,
345}
346
347impl<'a, I> Replica<'a, I>
348where
349 I: Deref<Target = ReplicaInfo> + DerefMut,
350{
351 pub fn new(store: StoreInstance<'a>, info: I) -> Self {
353 Replica { info, store }
354 }
355
356 pub fn insert(
364 &mut self,
365 key: impl AsRef<[u8]>,
366 author: &Author,
367 hash: Hash,
368 len: u64,
369 ) -> Result<usize, InsertError> {
370 if len == 0 || hash == Hash::EMPTY {
371 return Err(InsertError::EntryIsEmpty);
372 }
373 self.info.ensure_open()?;
374 let id = RecordIdentifier::new(self.id(), author.id(), key);
375 let record = Record::new_current(hash, len);
376 let entry = Entry::new(id, record);
377 let secret = self.secret_key()?;
378 let signed_entry = entry.sign(secret, author);
379 self.insert_entry(signed_entry, InsertOrigin::Local)
380 }
381
382 pub fn delete_prefix(
389 &mut self,
390 prefix: impl AsRef<[u8]>,
391 author: &Author,
392 ) -> Result<usize, InsertError> {
393 self.info.ensure_open()?;
394 let id = RecordIdentifier::new(self.id(), author.id(), prefix);
395 let entry = Entry::new_empty(id);
396 let signed_entry = entry.sign(self.secret_key()?, author);
397 self.insert_entry(signed_entry, InsertOrigin::Local)
398 }
399
400 pub fn insert_remote_entry(
408 &mut self,
409 entry: SignedEntry,
410 received_from: PeerIdBytes,
411 content_status: ContentStatus,
412 ) -> Result<usize, InsertError> {
413 self.info.ensure_open()?;
414 entry.validate_empty()?;
415 let origin = InsertOrigin::Sync {
416 from: received_from,
417 remote_content_status: content_status,
418 };
419 self.insert_entry(entry, origin)
420 }
421
422 fn insert_entry(
426 &mut self,
427 entry: SignedEntry,
428 origin: InsertOrigin,
429 ) -> Result<usize, InsertError> {
430 let namespace = self.id();
431
432 let store = &self.store;
433 validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
434
435 let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
436 tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
437
438 let removed_count = match outcome {
439 InsertOutcome::Inserted { removed } => removed,
440 InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
441 };
442
443 let insert_event = match origin {
444 InsertOrigin::Local => Event::LocalInsert { namespace, entry },
445 InsertOrigin::Sync {
446 from,
447 remote_content_status,
448 } => {
449 let download_policy = self
450 .store
451 .get_download_policy(&self.id())
452 .unwrap_or_default();
453 let should_download = download_policy.matches(entry.entry());
454 Event::RemoteInsert {
455 namespace,
456 entry,
457 from,
458 should_download,
459 remote_content_status,
460 }
461 }
462 };
463
464 self.info.subscribers.send(insert_event);
465
466 Ok(removed_count)
467 }
468
469 pub fn hash_and_insert(
474 &mut self,
475 key: impl AsRef<[u8]>,
476 author: &Author,
477 data: impl AsRef<[u8]>,
478 ) -> Result<Hash, InsertError> {
479 self.info.ensure_open()?;
480 let len = data.as_ref().len() as u64;
481 let hash = Hash::new(data);
482 self.insert(key, author, hash, len)?;
483 Ok(hash)
484 }
485
486 pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
488 RecordIdentifier::new(self.info.capability.id(), author.id(), key)
489 }
490
491 pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
493 self.info.ensure_open().map_err(anyhow::Error::from)?;
494 self.store.initial_message()
495 }
496
497 pub fn sync_process_message(
501 &mut self,
502 message: crate::ranger::Message<SignedEntry>,
503 from_peer: PeerIdBytes,
504 state: &mut SyncOutcome,
505 ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
506 self.info.ensure_open()?;
507 let my_namespace = self.id();
508 let now = system_time_now();
509
510 state.num_recv += message.value_count();
512 for (entry, _content_status) in message.values() {
513 state
514 .heads_received
515 .insert(entry.author(), entry.timestamp());
516 }
517
518 let cb = self.info.content_status_cb.clone();
521 let download_policy = self
522 .store
523 .get_download_policy(&my_namespace)
524 .unwrap_or_default();
525 let reply = self.store.process_message(
526 &Default::default(),
527 message,
528 |store, entry, content_status| {
530 let origin = InsertOrigin::Sync {
531 from: from_peer,
532 remote_content_status: content_status,
533 };
534 validate_entry(now, store, my_namespace, entry, &origin).is_ok()
535 },
536 |_store, entry, content_status| {
538 self.info.subscribers.send_with(|| {
540 let should_download = download_policy.matches(entry.entry());
541 Event::RemoteInsert {
542 from: from_peer,
543 namespace: my_namespace,
544 entry: entry.clone(),
545 should_download,
546 remote_content_status: content_status,
547 }
548 })
549 },
550 |_store, entry| {
552 if let Some(cb) = cb.as_ref() {
553 cb(entry.content_hash())
554 } else {
555 ContentStatus::Missing
556 }
557 },
558 )?;
559
560 if let Some(ref reply) = reply {
562 state.num_sent += reply.value_count();
563 }
564
565 Ok(reply)
566 }
567
568 pub fn id(&self) -> NamespaceId {
570 self.info.capability.id()
571 }
572
573 pub fn capability(&self) -> &Capability {
575 &self.info.capability
576 }
577
578 pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
581 self.info.capability.secret_key()
582 }
583}
584
585#[derive(Debug, thiserror::Error)]
587#[error("Replica allows read access only.")]
588pub struct ReadOnly;
589
590fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
598 now: u64,
599 store: &S,
600 expected_namespace: NamespaceId,
601 entry: &SignedEntry,
602 origin: &InsertOrigin,
603) -> Result<(), ValidationFailure> {
604 if entry.namespace() != expected_namespace {
606 return Err(ValidationFailure::InvalidNamespace);
607 }
608
609 if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
611 return Err(ValidationFailure::BadSignature);
612 }
613
614 if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
616 return Err(ValidationFailure::TooFarInTheFuture);
617 }
618 Ok(())
619}
620
621#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
623pub enum InsertError {
624 #[error("storage error")]
626 Store(anyhow::Error),
627 #[error("validation failure")]
629 Validation(#[from] ValidationFailure),
630 #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
632 NewerEntryExists,
633 #[error("Attempted to insert an empty entry")]
635 EntryIsEmpty,
636 #[error("Attempted to insert to read only replica")]
638 #[from(ReadOnly)]
639 ReadOnly,
640 #[error("replica is closed")]
642 Closed,
643}
644
645#[derive(thiserror::Error, Debug)]
647pub enum ValidationFailure {
648 #[error("Entry namespace does not match the current replica")]
650 InvalidNamespace,
651 #[error("Entry signature is invalid")]
653 BadSignature,
654 #[error("Entry timestamp is too far in the future.")]
656 TooFarInTheFuture,
657 #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
659 InvalidEmptyEntry,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
664pub struct SignedEntry {
665 signature: EntrySignature,
666 entry: Entry,
667}
668
669impl From<SignedEntry> for Entry {
670 fn from(value: SignedEntry) -> Self {
671 value.entry
672 }
673}
674
675impl PartialOrd for SignedEntry {
676 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
677 Some(self.cmp(other))
678 }
679}
680
681impl Ord for SignedEntry {
682 fn cmp(&self, other: &Self) -> Ordering {
683 self.entry.cmp(&other.entry)
684 }
685}
686
687impl PartialOrd for Entry {
688 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
689 Some(self.cmp(other))
690 }
691}
692
693impl Ord for Entry {
694 fn cmp(&self, other: &Self) -> Ordering {
695 self.id
696 .cmp(&other.id)
697 .then_with(|| self.record.cmp(&other.record))
698 }
699}
700
701impl SignedEntry {
702 pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
703 SignedEntry { signature, entry }
704 }
705
706 pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
708 let signature = EntrySignature::from_entry(&entry, namespace, author);
709 SignedEntry { signature, entry }
710 }
711
712 pub fn from_parts(
714 namespace: &NamespaceSecret,
715 author: &Author,
716 key: impl AsRef<[u8]>,
717 record: Record,
718 ) -> Self {
719 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
720 let entry = Entry::new(id, record);
721 Self::from_entry(entry, namespace, author)
722 }
723
724 pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
726 self.signature.verify(
727 &self.entry,
728 &self.entry.namespace().public_key(store)?,
729 &self.entry.author().public_key(store)?,
730 )
731 }
732
733 pub fn signature(&self) -> &EntrySignature {
735 &self.signature
736 }
737
738 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
740 self.entry().validate_empty()
741 }
742
743 pub fn entry(&self) -> &Entry {
745 &self.entry
746 }
747
748 pub fn content_hash(&self) -> Hash {
750 self.entry().content_hash()
751 }
752
753 pub fn content_len(&self) -> u64 {
755 self.entry().content_len()
756 }
757
758 pub fn author_bytes(&self) -> AuthorId {
760 self.entry().id().author()
761 }
762
763 pub fn key(&self) -> &[u8] {
765 self.entry().id().key()
766 }
767
768 pub fn timestamp(&self) -> u64 {
770 self.entry().timestamp()
771 }
772}
773
774impl RangeEntry for SignedEntry {
775 type Key = RecordIdentifier;
776 type Value = Record;
777
778 fn key(&self) -> &Self::Key {
779 &self.entry.id
780 }
781
782 fn value(&self) -> &Self::Value {
783 &self.entry.record
784 }
785
786 fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
787 let mut hasher = blake3::Hasher::new();
788 hasher.update(self.namespace().as_ref());
789 hasher.update(self.author_bytes().as_ref());
790 hasher.update(self.key());
791 hasher.update(&self.timestamp().to_be_bytes());
792 hasher.update(self.content_hash().as_bytes());
793 Fingerprint(hasher.finalize().into())
794 }
795}
796
797#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
799pub struct EntrySignature {
800 author_signature: Signature,
801 namespace_signature: Signature,
802}
803
804impl Debug for EntrySignature {
805 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
806 f.debug_struct("EntrySignature")
807 .field(
808 "namespace_signature",
809 &hex::encode(self.namespace_signature.to_bytes()),
810 )
811 .field(
812 "author_signature",
813 &hex::encode(self.author_signature.to_bytes()),
814 )
815 .finish()
816 }
817}
818
819impl EntrySignature {
820 pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
822 let bytes = entry.to_vec();
825 let namespace_signature = namespace.sign(&bytes);
826 let author_signature = author.sign(&bytes);
827
828 EntrySignature {
829 author_signature,
830 namespace_signature,
831 }
832 }
833
834 pub fn verify(
837 &self,
838 entry: &Entry,
839 namespace: &NamespacePublicKey,
840 author: &AuthorPublicKey,
841 ) -> Result<(), SignatureError> {
842 let bytes = entry.to_vec();
843 namespace.verify(&bytes, &self.namespace_signature)?;
844 author.verify(&bytes, &self.author_signature)?;
845
846 Ok(())
847 }
848
849 pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
850 let namespace_signature = Signature::from_bytes(namespace_sig);
851 let author_signature = Signature::from_bytes(author_sig);
852
853 EntrySignature {
854 author_signature,
855 namespace_signature,
856 }
857 }
858
859 pub(crate) fn author(&self) -> &Signature {
860 &self.author_signature
861 }
862
863 pub(crate) fn namespace(&self) -> &Signature {
864 &self.namespace_signature
865 }
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
874pub struct Entry {
875 id: RecordIdentifier,
876 record: Record,
877}
878
879impl Entry {
880 pub fn new(id: RecordIdentifier, record: Record) -> Self {
882 Entry { id, record }
883 }
884
885 pub fn new_empty(id: RecordIdentifier) -> Self {
887 Entry {
888 id,
889 record: Record::empty_current(),
890 }
891 }
892
893 pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
895 match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
896 (true, true) => Ok(()),
897 (false, false) => Ok(()),
898 (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
899 (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
900 }
901 }
902
903 pub fn id(&self) -> &RecordIdentifier {
905 &self.id
906 }
907
908 pub fn namespace(&self) -> NamespaceId {
910 self.id.namespace()
911 }
912
913 pub fn author(&self) -> AuthorId {
915 self.id.author()
916 }
917
918 pub fn key(&self) -> &[u8] {
920 self.id.key()
921 }
922
923 pub fn record(&self) -> &Record {
925 &self.record
926 }
927
928 pub fn content_hash(&self) -> Hash {
930 self.record.hash
931 }
932
933 pub fn content_len(&self) -> u64 {
935 self.record.len
936 }
937
938 pub fn timestamp(&self) -> u64 {
940 self.record.timestamp
941 }
942
943 pub fn encode(&self, out: &mut Vec<u8>) {
945 self.id.encode(out);
946 self.record.encode(out);
947 }
948
949 pub fn to_vec(&self) -> Vec<u8> {
951 let mut out = Vec::new();
952 self.encode(&mut out);
953 out
954 }
955
956 pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
958 SignedEntry::from_entry(self, namespace, author)
959 }
960}
961
962const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
963const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
964const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
965
966#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
968pub struct RecordIdentifier(Bytes);
969
970impl Default for RecordIdentifier {
971 fn default() -> Self {
972 Self::new(NamespaceId::default(), AuthorId::default(), b"")
973 }
974}
975
976impl Debug for RecordIdentifier {
977 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
978 f.debug_struct("RecordIdentifier")
979 .field("namespace", &self.namespace())
980 .field("author", &self.author())
981 .field("key", &std::string::String::from_utf8_lossy(self.key()))
982 .finish()
983 }
984}
985
986impl RangeKey for RecordIdentifier {
987 #[cfg(test)]
988 fn is_prefix_of(&self, other: &Self) -> bool {
989 other.as_ref().starts_with(self.as_ref())
990 }
991}
992
993fn system_time_now() -> u64 {
994 SystemTime::now()
995 .duration_since(SystemTime::UNIX_EPOCH)
996 .expect("time drift")
997 .as_micros() as u64
998}
999
1000impl RecordIdentifier {
1001 pub fn new(
1003 namespace: impl Into<NamespaceId>,
1004 author: impl Into<AuthorId>,
1005 key: impl AsRef<[u8]>,
1006 ) -> Self {
1007 let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
1008 bytes.extend_from_slice(namespace.into().as_bytes());
1009 bytes.extend_from_slice(author.into().as_bytes());
1010 bytes.extend_from_slice(key.as_ref());
1011 Self(bytes.freeze())
1012 }
1013
1014 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1016 out.extend_from_slice(&self.0);
1017 }
1018
1019 pub fn as_bytes(&self) -> Bytes {
1021 self.0.clone()
1022 }
1023
1024 pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
1026 (
1027 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1028 self.0[AUTHOR_BYTES].try_into().unwrap(),
1029 &self.0[KEY_BYTES],
1030 )
1031 }
1032
1033 pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
1035 (
1036 self.0[NAMESPACE_BYTES].try_into().unwrap(),
1037 self.0[AUTHOR_BYTES].try_into().unwrap(),
1038 self.0.slice(KEY_BYTES),
1039 )
1040 }
1041
1042 pub fn key(&self) -> &[u8] {
1044 &self.0[KEY_BYTES]
1045 }
1046
1047 pub fn key_bytes(&self) -> Bytes {
1049 self.0.slice(KEY_BYTES)
1050 }
1051
1052 pub fn namespace(&self) -> NamespaceId {
1054 let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
1055 value.into()
1056 }
1057
1058 pub fn author(&self) -> AuthorId {
1060 let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
1061 value.into()
1062 }
1063}
1064
1065impl AsRef<[u8]> for RecordIdentifier {
1066 fn as_ref(&self) -> &[u8] {
1067 &self.0
1068 }
1069}
1070
1071impl Deref for SignedEntry {
1072 type Target = Entry;
1073 fn deref(&self) -> &Self::Target {
1074 &self.entry
1075 }
1076}
1077
1078impl Deref for Entry {
1079 type Target = Record;
1080 fn deref(&self) -> &Self::Target {
1081 &self.record
1082 }
1083}
1084
1085#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1087pub struct Record {
1088 len: u64,
1090 hash: Hash,
1092 timestamp: u64,
1094}
1095
1096impl RangeValue for Record {}
1097
1098impl Ord for Record {
1102 fn cmp(&self, other: &Self) -> Ordering {
1103 self.timestamp
1104 .cmp(&other.timestamp)
1105 .then_with(|| self.hash.cmp(&other.hash))
1106 }
1107}
1108
1109impl PartialOrd for Record {
1110 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1111 Some(self.cmp(other))
1112 }
1113}
1114
1115impl Record {
1116 pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
1118 debug_assert!(
1119 len != 0 || hash == Hash::EMPTY,
1120 "if `len` is 0 then `hash` must be the hash of the empty byte range"
1121 );
1122 Record {
1123 hash,
1124 len,
1125 timestamp,
1126 }
1127 }
1128
1129 pub fn empty(timestamp: u64) -> Self {
1131 Self::new(Hash::EMPTY, 0, timestamp)
1132 }
1133
1134 pub fn empty_current() -> Self {
1136 Self::new_current(Hash::EMPTY, 0)
1137 }
1138
1139 pub fn is_empty(&self) -> bool {
1141 self.hash == Hash::EMPTY
1142 }
1143
1144 pub fn new_current(hash: Hash, len: u64) -> Self {
1146 let timestamp = system_time_now();
1147 Self::new(hash, len, timestamp)
1148 }
1149
1150 pub fn content_len(&self) -> u64 {
1152 self.len
1153 }
1154
1155 pub fn content_hash(&self) -> Hash {
1157 self.hash
1158 }
1159
1160 pub fn timestamp(&self) -> u64 {
1162 self.timestamp
1163 }
1164
1165 #[cfg(test)]
1166 pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
1167 let len = data.as_ref().len() as u64;
1168 let hash = Hash::new(data);
1169 Self::new_current(hash, len)
1170 }
1171
1172 #[cfg(test)]
1173 pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
1174 let len = data.as_ref().len() as u64;
1175 let hash = Hash::new(data);
1176 Self::new(hash, len, timestamp)
1177 }
1178
1179 pub(crate) fn encode(&self, out: &mut Vec<u8>) {
1181 out.extend_from_slice(&self.len.to_be_bytes());
1182 out.extend_from_slice(self.hash.as_ref());
1183 out.extend_from_slice(&self.timestamp.to_be_bytes())
1184 }
1185}
1186
1187#[cfg(test)]
1188mod tests {
1189 use std::collections::HashSet;
1190
1191 use anyhow::Result;
1192 use rand_core::SeedableRng;
1193
1194 use super::*;
1195 use crate::{
1196 actor::SyncHandle,
1197 ranger::{Range, Store as _},
1198 store::{OpenError, Query, SortBy, SortDirection, Store},
1199 };
1200
1201 #[test]
1202 fn test_basics_memory() -> Result<()> {
1203 let store = store::Store::memory();
1204 test_basics(store)?;
1205
1206 Ok(())
1207 }
1208
1209 #[test]
1210 fn test_basics_fs() -> Result<()> {
1211 let dbfile = tempfile::NamedTempFile::new()?;
1212 let store = store::fs::Store::persistent(dbfile.path())?;
1213 test_basics(store)?;
1214 Ok(())
1215 }
1216
1217 fn test_basics(mut store: Store) -> Result<()> {
1218 let mut rng = rand::thread_rng();
1219 let alice = Author::new(&mut rng);
1220 let bob = Author::new(&mut rng);
1221 let myspace = NamespaceSecret::new(&mut rng);
1222
1223 let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
1224 let record = Record::current_from_data(b"this is my cool data");
1225 let entry = Entry::new(record_id, record);
1226 let signed_entry = entry.sign(&myspace, &alice);
1227 signed_entry.verify(&()).expect("failed to verify");
1228
1229 let mut my_replica = store.new_replica(myspace.clone())?;
1230 for i in 0..10 {
1231 my_replica.hash_and_insert(
1232 format!("/{i}"),
1233 &alice,
1234 format!("{i}: hello from alice"),
1235 )?;
1236 }
1237
1238 for i in 0..10 {
1239 let res = store
1240 .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
1241 .unwrap();
1242 let len = format!("{i}: hello from alice").as_bytes().len() as u64;
1243 assert_eq!(res.entry().record().content_len(), len);
1244 res.verify(&())?;
1245 }
1246
1247 let mut my_replica = store.new_replica(myspace.clone())?;
1249 my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
1250 let _entry = store
1251 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1252 .unwrap();
1253 let mut my_replica = store.new_replica(myspace.clone())?;
1255 my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
1256 let _entry = store
1257 .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
1258 .unwrap();
1259
1260 let entries: Vec<_> = store
1262 .get_many(myspace.id(), Query::author(alice.id()))?
1263 .collect::<Result<_>>()?;
1264 assert_eq!(entries.len(), 11);
1265
1266 let entries: Vec<_> = store
1268 .get_many(myspace.id(), Query::author(bob.id()))?
1269 .collect::<Result<_>>()?;
1270 assert_eq!(entries.len(), 0);
1271
1272 let entries: Vec<_> = store
1274 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1275 .collect::<Result<_>>()?;
1276 assert_eq!(entries.len(), 1);
1277
1278 let entries: Vec<_> = store
1280 .get_many(myspace.id(), Query::all())?
1281 .collect::<Result<_>>()?;
1282 assert_eq!(entries.len(), 11);
1283
1284 let mut my_replica = store.new_replica(myspace.clone())?;
1286 let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
1287
1288 let entries: Vec<_> = store
1290 .get_many(myspace.id(), Query::author(alice.id()))?
1291 .collect::<Result<_>>()?;
1292 assert_eq!(entries.len(), 11);
1293
1294 let entries: Vec<_> = store
1295 .get_many(myspace.id(), Query::author(bob.id()))?
1296 .collect::<Result<_>>()?;
1297 assert_eq!(entries.len(), 1);
1298
1299 let entries: Vec<_> = store
1301 .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
1302 .collect::<Result<_>>()?;
1303 assert_eq!(entries.len(), 2);
1304
1305 let entries: Vec<_> = store
1307 .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
1308 .collect::<Result<_>>()?;
1309 assert_eq!(entries.len(), 2);
1310
1311 let entries: Vec<_> = store
1313 .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
1314 .collect::<Result<_>>()?;
1315 assert_eq!(entries.len(), 1);
1316
1317 let entries: Vec<_> = store
1318 .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
1319 .collect::<Result<_>>()?;
1320 assert_eq!(entries.len(), 1);
1321
1322 let entries: Vec<_> = store
1324 .get_many(myspace.id(), Query::all())?
1325 .collect::<Result<_>>()?;
1326 assert_eq!(entries.len(), 12);
1327
1328 let mut my_replica = store.new_replica(myspace.clone())?;
1330 let entries_second: Vec<_> = my_replica
1331 .store
1332 .get_range(Range::new(
1333 RecordIdentifier::default(),
1334 RecordIdentifier::default(),
1335 ))?
1336 .collect::<Result<_, _>>()?;
1337
1338 assert_eq!(entries_second.len(), 12);
1339 assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
1340
1341 test_lru_cache_like_behaviour(&mut store, myspace.id())?;
1342 store.flush()?;
1343 Ok(())
1344 }
1345
1346 fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
1349 #[track_caller]
1351 fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
1352 assert_eq!(
1353 expected_peers,
1354 &store
1355 .get_sync_peers(&namespace)
1356 .unwrap()
1357 .unwrap()
1358 .collect::<Vec<_>>(),
1359 "sync peers differ"
1360 );
1361 }
1362
1363 let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
1364 let mut expected_peers = Vec::with_capacity(count);
1366 for i in 0..count as u8 {
1367 let peer = [i; 32];
1368 expected_peers.insert(0, peer);
1369 store.register_useful_peer(namespace, peer)?;
1370 }
1371 verify_peers(store, namespace, &expected_peers);
1372
1373 expected_peers.pop();
1375 let newer_peer = [count as u8; 32];
1376 expected_peers.insert(0, newer_peer);
1377 store.register_useful_peer(namespace, newer_peer)?;
1378 verify_peers(store, namespace, &expected_peers);
1379
1380 let refreshed_peer = expected_peers.remove(2);
1382 expected_peers.insert(0, refreshed_peer);
1383 store.register_useful_peer(namespace, refreshed_peer)?;
1384 verify_peers(store, namespace, &expected_peers);
1385 Ok(())
1386 }
1387
1388 #[test]
1389 fn test_content_hashes_iterator_memory() -> Result<()> {
1390 let store = store::Store::memory();
1391 test_content_hashes_iterator(store)
1392 }
1393
1394 #[test]
1395 fn test_content_hashes_iterator_fs() -> Result<()> {
1396 let dbfile = tempfile::NamedTempFile::new()?;
1397 let store = store::fs::Store::persistent(dbfile.path())?;
1398 test_content_hashes_iterator(store)
1399 }
1400
1401 fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
1402 let mut rng = rand::thread_rng();
1403 let mut expected = HashSet::new();
1404 let n_replicas = 3;
1405 let n_entries = 4;
1406 for i in 0..n_replicas {
1407 let namespace = NamespaceSecret::new(&mut rng);
1408 let author = store.new_author(&mut rng)?;
1409 let mut replica = store.new_replica(namespace)?;
1410 for j in 0..n_entries {
1411 let key = format!("{j}");
1412 let data = format!("{i}:{j}");
1413 let hash = replica.hash_and_insert(key, &author, data)?;
1414 expected.insert(hash);
1415 }
1416 }
1417 assert_eq!(expected.len(), n_replicas * n_entries);
1418 let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
1419 assert_eq!(actual, expected);
1420 Ok(())
1421 }
1422
1423 #[test]
1424 fn test_multikey() {
1425 let mut rng = rand::thread_rng();
1426
1427 let k = ["a", "c", "z"];
1428
1429 let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
1430 n.sort_by_key(|n| n.id());
1431
1432 let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
1433 a.sort_by_key(|a| a.id());
1434
1435 {
1437 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1438 let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
1439 let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
1440
1441 let range = Range::new(ri0.clone(), ri2.clone());
1442 assert!(range.contains(&ri0), "start");
1443 assert!(range.contains(&ri1), "inside");
1444 assert!(!range.contains(&ri2), "end");
1445
1446 assert!(ri0 < ri1);
1447 assert!(ri1 < ri2);
1448 }
1449
1450 {
1452 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1453 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1454 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1455
1456 let range = Range::new(ri0.clone(), ri2.clone());
1457 assert!(range.contains(&ri0), "start");
1458 assert!(range.contains(&ri1), "inside");
1459 assert!(!range.contains(&ri2), "end");
1460
1461 assert!(ri0 < ri1);
1462 assert!(ri1 < ri2);
1463 }
1464
1465 {
1467 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1468 let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
1469 let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
1470
1471 let range = Range::new(ri0.clone(), ri2.clone());
1472 assert!(range.contains(&ri0), "start");
1473 assert!(range.contains(&ri1), "inside");
1474 assert!(!range.contains(&ri2), "end");
1475
1476 assert!(ri0 < ri1);
1477 assert!(ri1 < ri2);
1478 }
1479
1480 {
1482 let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
1483 let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
1484 let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
1485
1486 let range = Range::new(ri0.clone(), ri2.clone());
1487 assert!(range.contains(&ri0), "start");
1488 assert!(range.contains(&ri1), "inside");
1489 assert!(!range.contains(&ri2), "end");
1490
1491 assert!(ri0 < ri1);
1492 assert!(ri1 < ri2);
1493 }
1494
1495 {
1497 let a0 = a[0].id();
1500 let a1 = a[1].id();
1501 let n0 = n[0].id();
1502 let n1 = n[1].id();
1503 let k0 = k[0];
1504 let k1 = k[1];
1505
1506 assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
1507 assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
1508 assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
1509 assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
1510 }
1511 }
1512
1513 #[test]
1514 fn test_timestamps_memory() -> Result<()> {
1515 let store = store::Store::memory();
1516 test_timestamps(store)?;
1517
1518 Ok(())
1519 }
1520
1521 #[test]
1522 fn test_timestamps_fs() -> Result<()> {
1523 let dbfile = tempfile::NamedTempFile::new()?;
1524 let store = store::fs::Store::persistent(dbfile.path())?;
1525 test_timestamps(store)?;
1526 Ok(())
1527 }
1528
1529 fn test_timestamps(mut store: Store) -> Result<()> {
1530 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1531 let namespace = NamespaceSecret::new(&mut rng);
1532 let _replica = store.new_replica(namespace.clone())?;
1533 let author = store.new_author(&mut rng)?;
1534 store.close_replica(namespace.id());
1535 let mut replica = store.open_replica(&namespace.id())?;
1536
1537 let key = b"hello";
1538 let value = b"world";
1539 let entry = {
1540 let timestamp = 2;
1541 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1542 let record = Record::from_data(value, timestamp);
1543 Entry::new(id, record).sign(&namespace, &author)
1544 };
1545
1546 replica
1547 .insert_entry(entry.clone(), InsertOrigin::Local)
1548 .unwrap();
1549 store.close_replica(namespace.id());
1550 let res = store
1551 .get_exact(namespace.id(), author.id(), key, false)?
1552 .unwrap();
1553 assert_eq!(res, entry);
1554
1555 let entry2 = {
1556 let timestamp = 1;
1557 let id = RecordIdentifier::new(namespace.id(), author.id(), key);
1558 let record = Record::from_data(value, timestamp);
1559 Entry::new(id, record).sign(&namespace, &author)
1560 };
1561
1562 let mut replica = store.open_replica(&namespace.id())?;
1563 let res = replica.insert_entry(entry2, InsertOrigin::Local);
1564 store.close_replica(namespace.id());
1565 assert!(matches!(res, Err(InsertError::NewerEntryExists)));
1566 let res = store
1567 .get_exact(namespace.id(), author.id(), key, false)?
1568 .unwrap();
1569 assert_eq!(res, entry);
1570 store.flush()?;
1571 Ok(())
1572 }
1573
1574 #[test]
1575 fn test_replica_sync_memory() -> Result<()> {
1576 let alice_store = store::Store::memory();
1577 let bob_store = store::Store::memory();
1578
1579 test_replica_sync(alice_store, bob_store)?;
1580 Ok(())
1581 }
1582
1583 #[test]
1584 fn test_replica_sync_fs() -> Result<()> {
1585 let alice_dbfile = tempfile::NamedTempFile::new()?;
1586 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1587 let bob_dbfile = tempfile::NamedTempFile::new()?;
1588 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1589 test_replica_sync(alice_store, bob_store)?;
1590
1591 Ok(())
1592 }
1593
1594 fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1595 let alice_set = ["ape", "eel", "fox", "gnu"];
1596 let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
1597
1598 let mut rng = rand::thread_rng();
1599 let author = Author::new(&mut rng);
1600 let myspace = NamespaceSecret::new(&mut rng);
1601 let mut alice = alice_store.new_replica(myspace.clone())?;
1602 for el in &alice_set {
1603 alice.hash_and_insert(el, &author, el.as_bytes())?;
1604 }
1605
1606 let mut bob = bob_store.new_replica(myspace.clone())?;
1607 for el in &bob_set {
1608 bob.hash_and_insert(el, &author, el.as_bytes())?;
1609 }
1610
1611 let (alice_out, bob_out) = sync(&mut alice, &mut bob)?;
1612
1613 assert_eq!(alice_out.num_sent, 2);
1614 assert_eq!(bob_out.num_recv, 2);
1615 assert_eq!(alice_out.num_recv, 6);
1616 assert_eq!(bob_out.num_sent, 6);
1617
1618 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1619 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1620 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1621 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1622 alice_store.flush()?;
1623 bob_store.flush()?;
1624 Ok(())
1625 }
1626
1627 #[test]
1628 fn test_replica_timestamp_sync_memory() -> Result<()> {
1629 let alice_store = store::Store::memory();
1630 let bob_store = store::Store::memory();
1631
1632 test_replica_timestamp_sync(alice_store, bob_store)?;
1633 Ok(())
1634 }
1635
1636 #[test]
1637 fn test_replica_timestamp_sync_fs() -> Result<()> {
1638 let alice_dbfile = tempfile::NamedTempFile::new()?;
1639 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1640 let bob_dbfile = tempfile::NamedTempFile::new()?;
1641 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1642 test_replica_timestamp_sync(alice_store, bob_store)?;
1643
1644 Ok(())
1645 }
1646
1647 fn test_replica_timestamp_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1648 let mut rng = rand::thread_rng();
1649 let author = Author::new(&mut rng);
1650 let namespace = NamespaceSecret::new(&mut rng);
1651 let mut alice = alice_store.new_replica(namespace.clone())?;
1652 let mut bob = bob_store.new_replica(namespace.clone())?;
1653
1654 let key = b"key";
1655 let alice_value = b"alice";
1656 let bob_value = b"bob";
1657 let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
1658 let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
1660 sync(&mut alice, &mut bob)?;
1661 assert_eq!(
1662 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1663 Some(bob_hash)
1664 );
1665 assert_eq!(
1666 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1667 Some(bob_hash)
1668 );
1669
1670 let mut alice = alice_store.new_replica(namespace.clone())?;
1671 let mut bob = bob_store.new_replica(namespace.clone())?;
1672
1673 let alice_value_2 = b"alice2";
1674 let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
1676 let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
1677 sync(&mut alice, &mut bob)?;
1678 assert_eq!(
1679 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1680 Some(alice_hash_2)
1681 );
1682 assert_eq!(
1683 get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
1684 Some(alice_hash_2)
1685 );
1686 alice_store.flush()?;
1687 bob_store.flush()?;
1688 Ok(())
1689 }
1690
1691 #[test]
1692 fn test_future_timestamp() -> Result<()> {
1693 let mut rng = rand::thread_rng();
1694 let mut store = store::Store::memory();
1695 let author = Author::new(&mut rng);
1696 let namespace = NamespaceSecret::new(&mut rng);
1697
1698 let mut replica = store.new_replica(namespace.clone())?;
1699 let key = b"hi";
1700 let t = system_time_now();
1701 let record = Record::from_data(b"1", t);
1702 let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
1703 replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
1704
1705 assert_eq!(
1706 get_entry(&mut store, namespace.id(), author.id(), key)?,
1707 entry0
1708 );
1709
1710 let mut replica = store.new_replica(namespace.clone())?;
1711 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
1712 let record = Record::from_data(b"2", t);
1713 let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
1714 replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
1715 assert_eq!(
1716 get_entry(&mut store, namespace.id(), author.id(), key)?,
1717 entry1
1718 );
1719
1720 let mut replica = store.new_replica(namespace.clone())?;
1721 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
1722 let record = Record::from_data(b"2", t);
1723 let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
1724 replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
1725 assert_eq!(
1726 get_entry(&mut store, namespace.id(), author.id(), key)?,
1727 entry2
1728 );
1729
1730 let mut replica = store.new_replica(namespace.clone())?;
1731 let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
1732 let record = Record::from_data(b"2", t);
1733 let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
1734 let res = replica.insert_entry(entry3, InsertOrigin::Local);
1735 assert!(matches!(
1736 res,
1737 Err(InsertError::Validation(
1738 ValidationFailure::TooFarInTheFuture
1739 ))
1740 ));
1741 assert_eq!(
1742 get_entry(&mut store, namespace.id(), author.id(), key)?,
1743 entry2
1744 );
1745 store.flush()?;
1746 Ok(())
1747 }
1748
1749 #[test]
1750 fn test_insert_empty() -> Result<()> {
1751 let mut store = store::Store::memory();
1752 let mut rng = rand::thread_rng();
1753 let alice = Author::new(&mut rng);
1754 let myspace = NamespaceSecret::new(&mut rng);
1755 let mut replica = store.new_replica(myspace.clone())?;
1756 let hash = Hash::new(b"");
1757 let res = replica.insert(b"foo", &alice, hash, 0);
1758 assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
1759 store.flush()?;
1760 Ok(())
1761 }
1762
1763 #[test]
1764 fn test_prefix_delete_memory() -> Result<()> {
1765 let store = store::Store::memory();
1766 test_prefix_delete(store)?;
1767 Ok(())
1768 }
1769
1770 #[test]
1771 fn test_prefix_delete_fs() -> Result<()> {
1772 let dbfile = tempfile::NamedTempFile::new()?;
1773 let store = store::fs::Store::persistent(dbfile.path())?;
1774 test_prefix_delete(store)?;
1775 Ok(())
1776 }
1777
1778 fn test_prefix_delete(mut store: Store) -> Result<()> {
1779 let mut rng = rand::thread_rng();
1780 let alice = Author::new(&mut rng);
1781 let myspace = NamespaceSecret::new(&mut rng);
1782 let mut replica = store.new_replica(myspace.clone())?;
1783 let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
1784 let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
1785
1786 assert_eq!(
1788 get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
1789 Some(hash1)
1790 );
1791 assert_eq!(
1792 get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
1793 Some(hash2)
1794 );
1795
1796 let mut replica = store.new_replica(myspace.clone())?;
1798 let deleted = replica.delete_prefix(b"foo", &alice)?;
1799 assert_eq!(deleted, 2);
1800 assert_eq!(
1801 store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
1802 None
1803 );
1804 assert_eq!(
1805 store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
1806 None
1807 );
1808 assert_eq!(
1809 store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
1810 None
1811 );
1812 store.flush()?;
1813 Ok(())
1814 }
1815
1816 #[test]
1817 fn test_replica_sync_delete_memory() -> Result<()> {
1818 let alice_store = store::Store::memory();
1819 let bob_store = store::Store::memory();
1820
1821 test_replica_sync_delete(alice_store, bob_store)
1822 }
1823
1824 #[test]
1825 fn test_replica_sync_delete_fs() -> Result<()> {
1826 let alice_dbfile = tempfile::NamedTempFile::new()?;
1827 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1828 let bob_dbfile = tempfile::NamedTempFile::new()?;
1829 let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
1830 test_replica_sync_delete(alice_store, bob_store)
1831 }
1832
1833 fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
1834 let alice_set = ["foot"];
1835 let bob_set = ["fool", "foo", "fog"];
1836
1837 let mut rng = rand::thread_rng();
1838 let author = Author::new(&mut rng);
1839 let myspace = NamespaceSecret::new(&mut rng);
1840 let mut alice = alice_store.new_replica(myspace.clone())?;
1841 for el in &alice_set {
1842 alice.hash_and_insert(el, &author, el.as_bytes())?;
1843 }
1844
1845 let mut bob = bob_store.new_replica(myspace.clone())?;
1846 for el in &bob_set {
1847 bob.hash_and_insert(el, &author, el.as_bytes())?;
1848 }
1849
1850 sync(&mut alice, &mut bob)?;
1851
1852 check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
1853 check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
1854 check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
1855 check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
1856
1857 let mut alice = alice_store.new_replica(myspace.clone())?;
1858 let mut bob = bob_store.new_replica(myspace.clone())?;
1859 alice.delete_prefix("foo", &author)?;
1860 bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
1861 sync(&mut alice, &mut bob)?;
1862 check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
1863 check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
1864 alice_store.flush()?;
1865 bob_store.flush()?;
1866 Ok(())
1867 }
1868
1869 #[test]
1870 fn test_replica_remove_memory() -> Result<()> {
1871 let alice_store = store::Store::memory();
1872 test_replica_remove(alice_store)
1873 }
1874
1875 #[test]
1876 fn test_replica_remove_fs() -> Result<()> {
1877 let alice_dbfile = tempfile::NamedTempFile::new()?;
1878 let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
1879 test_replica_remove(alice_store)
1880 }
1881
1882 fn test_replica_remove(mut store: Store) -> Result<()> {
1883 let mut rng = rand::thread_rng();
1884 let namespace = NamespaceSecret::new(&mut rng);
1885 let author = Author::new(&mut rng);
1886 let mut replica = store.new_replica(namespace.clone())?;
1887
1888 let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
1890 let res = store
1891 .get_many(namespace.id(), Query::all())?
1892 .collect::<Vec<_>>();
1893 assert_eq!(res.len(), 1);
1894
1895 let res = store.remove_replica(&namespace.id());
1897 assert!(res.is_err());
1899 store.close_replica(namespace.id());
1900 store.remove_replica(&namespace.id())?;
1901 let res = store
1902 .get_many(namespace.id(), Query::all())?
1903 .collect::<Vec<_>>();
1904 assert_eq!(res.len(), 0);
1905
1906 let res = store.load_replica_info(&namespace.id());
1908 assert!(matches!(res, Err(OpenError::NotFound)));
1909
1910 let mut replica = store.new_replica(namespace.clone())?;
1912 replica.insert(b"foo", &author, hash, 3)?;
1913 let res = store
1914 .get_many(namespace.id(), Query::all())?
1915 .collect::<Vec<_>>();
1916 assert_eq!(res.len(), 1);
1917 store.flush()?;
1918 Ok(())
1919 }
1920
1921 #[test]
1922 fn test_replica_delete_edge_cases_memory() -> Result<()> {
1923 let store = store::Store::memory();
1924 test_replica_delete_edge_cases(store)
1925 }
1926
1927 #[test]
1928 fn test_replica_delete_edge_cases_fs() -> Result<()> {
1929 let dbfile = tempfile::NamedTempFile::new()?;
1930 let store = store::fs::Store::persistent(dbfile.path())?;
1931 test_replica_delete_edge_cases(store)
1932 }
1933
1934 fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
1935 let mut rng = rand::thread_rng();
1936 let author = Author::new(&mut rng);
1937 let namespace = NamespaceSecret::new(&mut rng);
1938
1939 let edgecases = [0u8, 1u8, 255u8];
1940 let prefixes = [0u8, 255u8];
1941 let hash = Hash::new(b"foo");
1942 let len = 3;
1943 for prefix in prefixes {
1944 let mut expected = vec![];
1945 let mut replica = store.new_replica(namespace.clone())?;
1946 for suffix in edgecases {
1947 let key = [prefix, suffix].to_vec();
1948 expected.push(key.clone());
1949 replica.insert(&key, &author, hash, len)?;
1950 }
1951 assert_keys(&mut store, namespace.id(), expected);
1952 let mut replica = store.new_replica(namespace.clone())?;
1953 replica.delete_prefix([prefix], &author)?;
1954 assert_keys(&mut store, namespace.id(), vec![]);
1955 }
1956
1957 let mut replica = store.new_replica(namespace.clone())?;
1958 let key = vec![1u8, 0u8];
1959 replica.insert(key, &author, hash, len)?;
1960 let key = vec![1u8, 1u8];
1961 replica.insert(key, &author, hash, len)?;
1962 let key = vec![1u8, 2u8];
1963 replica.insert(key, &author, hash, len)?;
1964 let prefix = vec![1u8, 1u8];
1965 replica.delete_prefix(prefix, &author)?;
1966 assert_keys(
1967 &mut store,
1968 namespace.id(),
1969 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1970 );
1971
1972 let mut replica = store.new_replica(namespace.clone())?;
1973 let key = vec![0u8, 255u8];
1974 replica.insert(key, &author, hash, len)?;
1975 let key = vec![0u8, 0u8];
1976 replica.insert(key, &author, hash, len)?;
1977 let prefix = vec![0u8];
1978 replica.delete_prefix(prefix, &author)?;
1979 assert_keys(
1980 &mut store,
1981 namespace.id(),
1982 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
1983 );
1984 store.flush()?;
1985 Ok(())
1986 }
1987
1988 #[test]
1989 fn test_latest_iter_memory() -> Result<()> {
1990 let store = store::Store::memory();
1991 test_latest_iter(store)
1992 }
1993
1994 #[test]
1995 fn test_latest_iter_fs() -> Result<()> {
1996 let dbfile = tempfile::NamedTempFile::new()?;
1997 let store = store::fs::Store::persistent(dbfile.path())?;
1998 test_latest_iter(store)
1999 }
2000
2001 fn test_latest_iter(mut store: Store) -> Result<()> {
2002 let mut rng = rand::thread_rng();
2003 let author0 = Author::new(&mut rng);
2004 let author1 = Author::new(&mut rng);
2005 let namespace = NamespaceSecret::new(&mut rng);
2006 let mut replica = store.new_replica(namespace.clone())?;
2007
2008 replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
2009 let latest = store
2010 .get_latest_for_each_author(namespace.id())?
2011 .collect::<Result<Vec<_>>>()?;
2012 assert_eq!(latest.len(), 1);
2013 assert_eq!(latest[0].2, b"a0.1".to_vec());
2014
2015 let mut replica = store.new_replica(namespace.clone())?;
2016 replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
2017 replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
2018 let latest = store
2019 .get_latest_for_each_author(namespace.id())?
2020 .collect::<Result<Vec<_>>>()?;
2021 let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
2022 latest_keys.sort();
2023 assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
2024 store.flush()?;
2025 Ok(())
2026 }
2027
2028 #[test]
2029 fn test_replica_byte_keys_memory() -> Result<()> {
2030 let store = store::Store::memory();
2031
2032 test_replica_byte_keys(store)?;
2033 Ok(())
2034 }
2035
2036 #[test]
2037 fn test_replica_byte_keys_fs() -> Result<()> {
2038 let dbfile = tempfile::NamedTempFile::new()?;
2039 let store = store::fs::Store::persistent(dbfile.path())?;
2040 test_replica_byte_keys(store)?;
2041
2042 Ok(())
2043 }
2044
2045 fn test_replica_byte_keys(mut store: Store) -> Result<()> {
2046 let mut rng = rand::thread_rng();
2047 let author = Author::new(&mut rng);
2048 let namespace = NamespaceSecret::new(&mut rng);
2049
2050 let hash = Hash::new(b"foo");
2051 let len = 3;
2052
2053 let key = vec![1u8, 0u8];
2054 let mut replica = store.new_replica(namespace.clone())?;
2055 replica.insert(key, &author, hash, len)?;
2056 assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
2057 let key = vec![1u8, 2u8];
2058 let mut replica = store.new_replica(namespace.clone())?;
2059 replica.insert(key, &author, hash, len)?;
2060 assert_keys(
2061 &mut store,
2062 namespace.id(),
2063 vec![vec![1u8, 0u8], vec![1u8, 2u8]],
2064 );
2065
2066 let key = vec![0u8, 255u8];
2067 let mut replica = store.new_replica(namespace.clone())?;
2068 replica.insert(key, &author, hash, len)?;
2069 assert_keys(
2070 &mut store,
2071 namespace.id(),
2072 vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
2073 );
2074 store.flush()?;
2075 Ok(())
2076 }
2077
2078 #[test]
2079 fn test_replica_capability_memory() -> Result<()> {
2080 let store = store::Store::memory();
2081 test_replica_capability(store)
2082 }
2083
2084 #[test]
2085 fn test_replica_capability_fs() -> Result<()> {
2086 let dbfile = tempfile::NamedTempFile::new()?;
2087 let store = store::fs::Store::persistent(dbfile.path())?;
2088 test_replica_capability(store)
2089 }
2090
2091 #[allow(clippy::redundant_pattern_matching)]
2092 fn test_replica_capability(mut store: Store) -> Result<()> {
2093 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2094 let author = store.new_author(&mut rng)?;
2095 let namespace = NamespaceSecret::new(&mut rng);
2096
2097 let capability = Capability::Read(namespace.id());
2099 store.import_namespace(capability)?;
2100 let mut replica = store.open_replica(&namespace.id())?;
2101 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2102 assert!(matches!(res, Err(InsertError::ReadOnly)));
2103
2104 let capability = Capability::Write(namespace.clone());
2106 store.import_namespace(capability)?;
2107 let mut replica = store.open_replica(&namespace.id())?;
2108 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2109 assert!(matches!(res, Ok(_)));
2110 store.close_replica(namespace.id());
2111 let mut replica = store.open_replica(&namespace.id())?;
2112 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2113 assert!(res.is_ok());
2114
2115 let capability = Capability::Read(namespace.id());
2117 store.import_namespace(capability)?;
2118 store.close_replica(namespace.id());
2119 let mut replica = store.open_replica(&namespace.id())?;
2120 let res = replica.hash_and_insert(b"foo", &author, b"bar");
2121 assert!(res.is_ok());
2122 store.flush()?;
2123 Ok(())
2124 }
2125
2126 #[tokio::test]
2127 async fn test_actor_capability_memory() -> Result<()> {
2128 let store = store::Store::memory();
2129 test_actor_capability(store).await
2130 }
2131
2132 #[tokio::test]
2133 async fn test_actor_capability_fs() -> Result<()> {
2134 let dbfile = tempfile::NamedTempFile::new()?;
2135 let store = store::fs::Store::persistent(dbfile.path())?;
2136 test_actor_capability(store).await
2137 }
2138
2139 async fn test_actor_capability(store: Store) -> Result<()> {
2140 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2142 let author = Author::new(&mut rng);
2143 let handle = SyncHandle::spawn(store, None, "test".into());
2144 let author = handle.import_author(author).await?;
2145 let namespace = NamespaceSecret::new(&mut rng);
2146 let id = namespace.id();
2147
2148 let capability = Capability::Read(namespace.id());
2150 handle.import_namespace(capability).await?;
2151 handle.open(namespace.id(), Default::default()).await?;
2152 let res = handle
2153 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2154 .await;
2155 assert!(res.is_err());
2156
2157 let capability = Capability::Write(namespace.clone());
2159 handle.import_namespace(capability).await?;
2160 let res = handle
2161 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2162 .await;
2163 assert!(res.is_ok());
2164
2165 handle.close(namespace.id()).await?;
2167 let res = handle
2168 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2169 .await;
2170 assert!(res.is_err());
2171 handle.open(namespace.id(), Default::default()).await?;
2172 let res = handle
2173 .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
2174 .await;
2175 assert!(res.is_ok());
2176 Ok(())
2177 }
2178
2179 fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
2180 let mut res = vec![];
2181 while let Ok(ev) = events.try_recv() {
2182 res.push(ev);
2183 }
2184 res
2185 }
2186
2187 #[test]
2190 fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
2191 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2192 let mut store1 = store::Store::memory();
2193 let mut store2 = store::Store::memory();
2194 let peer1 = [1u8; 32];
2195 let peer2 = [2u8; 32];
2196 let mut state1 = SyncOutcome::default();
2197 let mut state2 = SyncOutcome::default();
2198
2199 let author = Author::new(&mut rng);
2200 let namespace = NamespaceSecret::new(&mut rng);
2201 let mut replica1 = store1.new_replica(namespace.clone())?;
2202 let mut replica2 = store2.new_replica(namespace.clone())?;
2203
2204 let (events1_sender, events1) = async_channel::bounded(32);
2205 let (events2_sender, events2) = async_channel::bounded(32);
2206
2207 replica1.info.subscribe(events1_sender);
2208 replica2.info.subscribe(events2_sender);
2209
2210 replica1.hash_and_insert(b"foo", &author, b"init")?;
2211
2212 let from1 = replica1.sync_initial_message()?;
2213 let from2 = replica2
2214 .sync_process_message(from1, peer1, &mut state2)
2215 .unwrap()
2216 .unwrap();
2217 let from1 = replica1
2218 .sync_process_message(from2, peer2, &mut state1)
2219 .unwrap()
2220 .unwrap();
2221 replica2.hash_and_insert(b"foo", &author, b"update")?;
2225 let from2 = replica2
2226 .sync_process_message(from1, peer1, &mut state2)
2227 .unwrap();
2228 assert!(from2.is_none());
2229 let events1 = drain(events1);
2230 let events2 = drain(events2);
2231 assert_eq!(events1.len(), 1);
2232 assert_eq!(events2.len(), 1);
2233 assert!(matches!(events1[0], Event::LocalInsert { .. }));
2234 assert!(matches!(events2[0], Event::LocalInsert { .. }));
2235 assert_eq!(state1.num_sent, 1);
2236 assert_eq!(state1.num_recv, 0);
2237 assert_eq!(state2.num_sent, 0);
2238 assert_eq!(state2.num_recv, 1);
2239 store1.flush()?;
2240 store2.flush()?;
2241 Ok(())
2242 }
2243
2244 #[test]
2245 fn test_replica_queries_mem() -> Result<()> {
2246 let store = store::Store::memory();
2247
2248 test_replica_queries(store)?;
2249 Ok(())
2250 }
2251
2252 #[test]
2253 fn test_replica_queries_fs() -> Result<()> {
2254 let dbfile = tempfile::NamedTempFile::new()?;
2255 let store = store::fs::Store::persistent(dbfile.path())?;
2256 test_replica_queries(store)?;
2257
2258 Ok(())
2259 }
2260
2261 fn test_replica_queries(mut store: Store) -> Result<()> {
2262 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2263 let namespace = NamespaceSecret::new(&mut rng);
2264 let namespace_id = namespace.id();
2265
2266 let a1 = store.new_author(&mut rng)?;
2267 let a2 = store.new_author(&mut rng)?;
2268 let a3 = store.new_author(&mut rng)?;
2269 println!(
2270 "a1 {} a2 {} a3 {}",
2271 a1.id().fmt_short(),
2272 a2.id().fmt_short(),
2273 a3.id().fmt_short()
2274 );
2275
2276 let mut replica = store.new_replica(namespace.clone())?;
2277 replica.hash_and_insert("hi/world", &a2, "a2")?;
2278 replica.hash_and_insert("hi/world", &a1, "a1")?;
2279 replica.hash_and_insert("hi/moon", &a2, "a1")?;
2280 replica.hash_and_insert("hi", &a3, "a3")?;
2281
2282 struct QueryTester<'a> {
2283 store: &'a mut Store,
2284 namespace: NamespaceId,
2285 }
2286 impl QueryTester<'_> {
2287 fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
2288 let query = query.into();
2289 let actual = self
2290 .store
2291 .get_many(self.namespace, query.clone())
2292 .unwrap()
2293 .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
2294 .collect::<Result<Vec<_>>>()
2295 .unwrap();
2296 let expected = expected
2297 .into_iter()
2298 .map(|(key, author)| (key.to_string(), author.id()))
2299 .collect::<Vec<_>>();
2300 assert_eq!(actual, expected, "query: {query:#?}")
2301 }
2302 }
2303
2304 let mut qt = QueryTester {
2305 store: &mut store,
2306 namespace: namespace_id,
2307 };
2308
2309 qt.assert(
2310 Query::all(),
2311 vec![
2312 ("hi/world", &a1),
2313 ("hi/moon", &a2),
2314 ("hi/world", &a2),
2315 ("hi", &a3),
2316 ],
2317 );
2318
2319 qt.assert(
2320 Query::single_latest_per_key(),
2321 vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
2322 );
2323
2324 qt.assert(
2325 Query::single_latest_per_key().sort_direction(SortDirection::Desc),
2326 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2327 );
2328
2329 qt.assert(
2330 Query::single_latest_per_key().key_prefix("hi/"),
2331 vec![("hi/moon", &a2), ("hi/world", &a1)],
2332 );
2333
2334 qt.assert(
2335 Query::single_latest_per_key()
2336 .key_prefix("hi/")
2337 .sort_direction(SortDirection::Desc),
2338 vec![("hi/world", &a1), ("hi/moon", &a2)],
2339 );
2340
2341 qt.assert(
2342 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
2343 vec![
2344 ("hi", &a3),
2345 ("hi/moon", &a2),
2346 ("hi/world", &a1),
2347 ("hi/world", &a2),
2348 ],
2349 );
2350
2351 qt.assert(
2352 Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2353 vec![
2354 ("hi/world", &a2),
2355 ("hi/world", &a1),
2356 ("hi/moon", &a2),
2357 ("hi", &a3),
2358 ],
2359 );
2360
2361 qt.assert(
2362 Query::all().key_prefix("hi/"),
2363 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2364 );
2365
2366 qt.assert(
2367 Query::all().key_prefix("hi/").offset(1).limit(1),
2368 vec![("hi/moon", &a2)],
2369 );
2370
2371 qt.assert(
2372 Query::all()
2373 .key_prefix("hi/")
2374 .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
2375 vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
2376 );
2377
2378 qt.assert(
2379 Query::all()
2380 .key_prefix("hi/")
2381 .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
2382 .offset(1)
2383 .limit(1),
2384 vec![("hi/world", &a1)],
2385 );
2386
2387 qt.assert(
2388 Query::all()
2389 .key_prefix("hi/")
2390 .sort_by(SortBy::AuthorKey, SortDirection::Asc),
2391 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
2392 );
2393
2394 qt.assert(
2395 Query::all()
2396 .key_prefix("hi/")
2397 .sort_by(SortBy::AuthorKey, SortDirection::Desc),
2398 vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
2399 );
2400
2401 qt.assert(
2402 Query::all()
2403 .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
2404 .limit(2)
2405 .offset(1),
2406 vec![("hi/moon", &a2), ("hi/world", &a1)],
2407 );
2408
2409 let mut replica = store.new_replica(namespace)?;
2410 replica.delete_prefix("hi/world", &a2)?;
2411 let mut qt = QueryTester {
2412 store: &mut store,
2413 namespace: namespace_id,
2414 };
2415
2416 qt.assert(
2417 Query::all(),
2418 vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
2419 );
2420
2421 qt.assert(
2422 Query::all().include_empty(),
2423 vec![
2424 ("hi/world", &a1),
2425 ("hi/moon", &a2),
2426 ("hi/world", &a2),
2427 ("hi", &a3),
2428 ],
2429 );
2430 store.flush()?;
2431 Ok(())
2432 }
2433
2434 #[test]
2435 fn test_dl_policies_mem() -> Result<()> {
2436 let mut store = store::Store::memory();
2437 test_dl_policies(&mut store)
2438 }
2439
2440 #[test]
2441 fn test_dl_policies_fs() -> Result<()> {
2442 let dbfile = tempfile::NamedTempFile::new()?;
2443 let mut store = store::fs::Store::persistent(dbfile.path())?;
2444 test_dl_policies(&mut store)
2445 }
2446
2447 fn test_dl_policies(store: &mut Store) -> Result<()> {
2448 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
2449 let namespace = NamespaceSecret::new(&mut rng);
2450 let id = namespace.id();
2451
2452 let filter = store::FilterKind::Exact("foo".into());
2453 let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
2454 store
2455 .set_download_policy(&id, policy.clone())
2456 .expect_err("document dos not exist");
2457
2458 store.new_replica(namespace)?;
2460
2461 store.set_download_policy(&id, policy.clone())?;
2462 let retrieved_policy = store.get_download_policy(&id)?;
2463 assert_eq!(retrieved_policy, policy);
2464 store.flush()?;
2465 Ok(())
2466 }
2467
2468 fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
2469 expected.sort();
2470 assert_eq!(expected, get_keys_sorted(store, namespace));
2471 }
2472
2473 fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
2474 let mut res = store
2475 .get_many(namespace, Query::all())
2476 .unwrap()
2477 .map(|e| e.map(|e| e.key().to_vec()))
2478 .collect::<Result<Vec<_>>>()
2479 .unwrap();
2480 res.sort();
2481 res
2482 }
2483
2484 fn get_entry(
2485 store: &mut Store,
2486 namespace: NamespaceId,
2487 author: AuthorId,
2488 key: &[u8],
2489 ) -> anyhow::Result<SignedEntry> {
2490 let entry = store
2491 .get_exact(namespace, author, key, true)?
2492 .ok_or_else(|| anyhow::anyhow!("not found"))?;
2493 Ok(entry)
2494 }
2495
2496 fn get_content_hash(
2497 store: &mut Store,
2498 namespace: NamespaceId,
2499 author: AuthorId,
2500 key: &[u8],
2501 ) -> anyhow::Result<Option<Hash>> {
2502 let hash = store
2503 .get_exact(namespace, author, key, false)?
2504 .map(|e| e.content_hash());
2505 Ok(hash)
2506 }
2507
2508 fn sync(alice: &mut Replica, bob: &mut Replica) -> Result<(SyncOutcome, SyncOutcome)> {
2509 let alice_peer_id = [1u8; 32];
2510 let bob_peer_id = [2u8; 32];
2511 let mut alice_state = SyncOutcome::default();
2512 let mut bob_state = SyncOutcome::default();
2513 let mut next_to_bob = Some(alice.sync_initial_message()?);
2515 let mut rounds = 0;
2516 while let Some(msg) = next_to_bob.take() {
2517 assert!(rounds < 100, "too many rounds");
2518 rounds += 1;
2519 println!("round {}", rounds);
2520 if let Some(msg) = bob.sync_process_message(msg, alice_peer_id, &mut bob_state)? {
2521 next_to_bob = alice.sync_process_message(msg, bob_peer_id, &mut alice_state)?
2522 }
2523 }
2524 assert_eq!(alice_state.num_sent, bob_state.num_recv);
2525 assert_eq!(alice_state.num_recv, bob_state.num_sent);
2526 Ok((alice_state, bob_state))
2527 }
2528
2529 fn check_entries(
2530 store: &mut Store,
2531 namespace: &NamespaceId,
2532 author: &Author,
2533 set: &[&str],
2534 ) -> Result<()> {
2535 for el in set {
2536 store.get_exact(*namespace, author.id(), el, false)?;
2537 }
2538 Ok(())
2539 }
2540}