1#![warn(missing_docs)]
2use std::collections::BTreeMap;
3use std::collections::BTreeSet;
4use std::str::FromStr;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use super::subring::Subring;
10use crate::algebra::JoinSemilattice;
11use crate::consts::ENTRY_DATA_MAX_LEN;
12use crate::dht::Did;
13use crate::ecc::HashStr;
14use crate::error::Error;
15use crate::error::Result;
16use crate::message::Encoded;
17use crate::message::Encoder;
18use crate::message::MessagePayload;
19use crate::message::MessageVerificationExt;
20
21mod crdt;
22
23pub use crdt::DataTopicBuffer;
24pub use crdt::EntryCrdt;
25pub use crdt::EntryDot;
26pub use crdt::EntryVersion;
27pub use crdt::GSet;
28pub use crdt::RelayMessageSet;
29pub use crdt::SubringMemberSet;
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
33pub enum EntryKind {
34 Data,
36 Subring,
38 RelayMessage,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44enum EntryStampKind {
45 Overwrite,
46 Delta,
47}
48
49#[derive(Serialize)]
54struct OperationDigest<'a> {
55 kind: EntryKind,
56 did: Did,
57 data: &'a [Encoded],
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EntryOperation {
63 Overwrite(Entry),
65 Extend(Entry),
68 Touch(Entry),
72 JoinSubring(String, Did),
74 Tombstone(Entry),
81}
82
83#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
96pub struct Entry {
97 pub did: Did,
100 pub data: Vec<Encoded>,
102 pub kind: EntryKind,
104 #[serde(default)]
106 pub crdt: EntryCrdt,
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
114pub struct PlacedEntry {
115 pub key: Did,
117 pub entry: Entry,
119}
120
121impl PlacedEntry {
122 pub fn new(key: Did, entry: Entry) -> Self {
124 Self { key, entry }
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
136pub struct SyncedEntryAck {
137 pub key: Did,
139 pub entry: Entry,
141}
142
143impl SyncedEntryAck {
144 pub fn new(key: Did, entry: Entry) -> Self {
146 Self { key, entry }
147 }
148
149 pub fn confirms_local_value(&self, local: &Entry) -> Result<bool> {
155 Ok(self.entry.clone().try_into_storage_entry()?
156 == local.clone().try_into_storage_entry()?)
157 }
158}
159
160#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
165pub struct EntryLookupKey {
166 pub resource: Did,
168 pub placement: Did,
170}
171
172impl EntryLookupKey {
173 pub fn new(resource: Did, placement: Did) -> Self {
175 Self {
176 resource,
177 placement,
178 }
179 }
180}
181
182#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
184pub struct PlacementMiss {
185 pub key: Did,
187 pub owner: Did,
189}
190
191impl PlacementMiss {
192 pub fn new(key: Did, owner: Did) -> Self {
194 Self { key, owner }
195 }
196}
197
198#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EntryLookupEvidence {
201 pub entry: Entry,
203 pub misses: Vec<PlacementMiss>,
205}
206
207impl EntryLookupEvidence {
208 pub fn new(entry: Entry, misses: Vec<PlacementMiss>) -> Self {
210 Self { entry, misses }
211 }
212}
213
214impl Entry {
215 pub fn new(did: Did, data: Vec<Encoded>, kind: EntryKind) -> Self {
217 Self {
218 did,
219 data,
220 kind,
221 crdt: EntryCrdt::default(),
222 }
223 }
224
225 pub fn gen_did(topic: &str) -> Result<Did> {
227 let hash: HashStr = topic.into();
228 let did = Did::from_str(&hash.inner());
229 tracing::debug!("gen_did: topic: {}, did: {:?}", topic, did);
230 did
231 }
232}
233
234impl EntryOperation {
235 pub fn stamped(self, actor: Did) -> Result<Self> {
240 Ok(match self {
241 EntryOperation::Overwrite(entry) => EntryOperation::Overwrite(
242 entry.ensure_stamp_after(actor, None, EntryStampKind::Overwrite)?,
243 ),
244 EntryOperation::Extend(entry) => EntryOperation::Extend(entry.ensure_stamp_after(
245 actor,
246 None,
247 EntryStampKind::Delta,
248 )?),
249 EntryOperation::Touch(entry) => EntryOperation::Touch(entry.ensure_stamp_after(
250 actor,
251 None,
252 EntryStampKind::Delta,
253 )?),
254 EntryOperation::JoinSubring(name, did) => EntryOperation::JoinSubring(name, did),
255 EntryOperation::Tombstone(entry) => EntryOperation::Tombstone(entry),
256 })
257 }
258
259 pub fn did(&self) -> Result<Did> {
261 Ok(match self {
262 EntryOperation::Overwrite(entry) => entry.did,
263 EntryOperation::Extend(entry) => entry.did,
264 EntryOperation::Touch(entry) => entry.did,
265 EntryOperation::JoinSubring(name, _) => Entry::gen_did(name)?,
266 EntryOperation::Tombstone(entry) => entry.did,
267 })
268 }
269
270 pub fn kind(&self) -> EntryKind {
272 match self {
273 EntryOperation::Overwrite(entry) => entry.kind,
274 EntryOperation::Extend(entry) => entry.kind,
275 EntryOperation::Touch(entry) => entry.kind,
276 EntryOperation::JoinSubring(..) => EntryKind::Subring,
277 EntryOperation::Tombstone(entry) => entry.kind,
278 }
279 }
280
281 pub fn gen_default_entry(self) -> Result<Entry> {
283 match self {
284 EntryOperation::JoinSubring(name, did) => Subring::new(&name, did)?.try_into(),
285 _ => Ok(Entry::new(self.did()?, vec![], self.kind())),
286 }
287 }
288}
289
290impl TryFrom<MessagePayload> for Entry {
291 type Error = Error;
292 fn try_from(msg: MessagePayload) -> Result<Self> {
293 let did = msg.signer() + Did::from(1u32);
296 let data = msg.encode()?;
297 Ok(Self {
298 did,
299 data: vec![data],
300 kind: EntryKind::RelayMessage,
301 crdt: EntryCrdt::default(),
302 })
303 }
304}
305
306impl TryFrom<(String, Encoded)> for Entry {
307 type Error = Error;
308 fn try_from((topic, e): (String, Encoded)) -> Result<Self> {
309 Ok(Self {
310 did: Self::gen_did(&topic)?,
311 data: vec![e],
312 kind: EntryKind::Data,
313 crdt: EntryCrdt::default(),
314 })
315 }
316}
317
318impl TryFrom<(String, String)> for Entry {
319 type Error = Error;
320 fn try_from((topic, s): (String, String)) -> Result<Self> {
321 let encoded_message = s.encode()?;
322 (topic, encoded_message).try_into()
323 }
324}
325
326impl TryFrom<String> for Entry {
327 type Error = Error;
328 fn try_from(topic: String) -> Result<Self> {
329 (topic.clone(), topic).try_into()
330 }
331}
332
333impl Entry {
334 fn with_element_dots(mut self, version: EntryVersion) -> Result<Self> {
335 self.crdt.dots = self
336 .data
337 .iter()
338 .enumerate()
339 .map(|(index, _)| EntryDot::for_index(version, index))
340 .collect::<Result<Vec<_>>>()?;
341 Ok(self)
342 }
343
344 fn stamp_overwrite(mut self, version: EntryVersion) -> Result<Self> {
345 self.crdt.register = Some(version);
346 self.with_element_dots(version)
347 }
348
349 fn stamp_delta(self, version: EntryVersion) -> Result<Self> {
350 self.with_element_dots(version)
351 }
352
353 fn stamp(self, version: EntryVersion, kind: EntryStampKind) -> Result<Self> {
354 match kind {
355 EntryStampKind::Overwrite => self.stamp_overwrite(version),
356 EntryStampKind::Delta => self.stamp_delta(version),
357 }
358 }
359
360 fn operation_digest(&self) -> Result<Did> {
361 let digest = OperationDigest {
362 kind: self.kind,
363 did: self.did,
364 data: &self.data,
365 };
366 let bytes = bincode::serialize(&digest).map_err(Error::BincodeSerialize)?;
367 Did::try_from(HashStr::from_bytes(&bytes))
368 }
369
370 fn issue_version_after(&self, actor: Did, floor: Option<EntryVersion>) -> Result<EntryVersion> {
371 Ok(EntryVersion::issued_by(actor, self.operation_digest()?).after(floor))
372 }
373
374 fn ensure_stamp_after(
375 self,
376 actor: Did,
377 floor: Option<EntryVersion>,
378 kind: EntryStampKind,
379 ) -> Result<Self> {
380 if self.crdt.has_write_witness() {
381 return Ok(self);
382 }
383 let version = self.issue_version_after(actor, floor)?;
384 self.stamp(version, kind)
385 }
386
387 fn max_observed_version(&self) -> Option<EntryVersion> {
388 self.crdt
389 .dots
390 .iter()
391 .map(|dot| dot.version)
392 .chain(self.crdt.tombstones.iter().map(|dot| dot.version))
393 .chain(self.crdt.register)
394 .max()
395 }
396
397 fn validate_same_carrier(&self, other: &Self) -> Result<()> {
398 if !self.same_kind_as(other) {
399 return Err(Error::EntryKindNotEqual);
400 }
401 if !self.same_key_as(other) {
402 return Err(Error::EntryDidNotEqual);
403 }
404 Ok(())
405 }
406
407 fn dot_for_element(&self, index: usize) -> Result<EntryDot> {
408 if let Some(dot) = self.crdt.dots.get(index).copied() {
409 return Ok(dot);
410 }
411 EntryDot::for_index(self.crdt.legacy_floor(), index)
412 }
413
414 fn topic_buffer(&self) -> Result<DataTopicBuffer> {
415 let mut values = BTreeMap::new();
416 for (index, value) in self.data.iter().cloned().enumerate() {
417 let dot = self.dot_for_element(index)?;
418 values
419 .entry(value)
420 .and_modify(|current: &mut EntryDot| {
421 *current = (*current).max(dot);
422 })
423 .or_insert(dot);
424 }
425 Ok(DataTopicBuffer::new(self.crdt.register, values))
426 }
427
428 fn relay_set(&self) -> Result<RelayMessageSet> {
429 Ok(RelayMessageSet::new(
430 self.topic_buffer()?,
431 self.crdt.tombstones.iter().copied().collect(),
432 ))
433 }
434
435 fn subring_member_set(&self) -> Result<SubringMemberSet> {
436 let subring: Subring = self.clone().try_into()?;
437 let mut members = SubringMemberSet::new();
438 for member in subring.finger.list().iter().flatten().copied() {
439 members.insert(member);
440 }
441 Ok(members)
442 }
443
444 fn materialize_elements(
445 did: Did,
446 kind: EntryKind,
447 register: Option<EntryVersion>,
448 elements: impl IntoIterator<Item = (Encoded, EntryDot)>,
449 tombstones: BTreeSet<EntryDot>,
450 ) -> Self {
451 let mut visible = elements
452 .into_iter()
453 .filter(|(_, dot)| {
454 let visible_after_reset = register.is_none_or(|floor| dot.version >= floor);
455 visible_after_reset && !tombstones.contains(dot)
456 })
457 .collect::<Vec<_>>();
458 visible.sort_by(|(left_value, left_dot), (right_value, right_dot)| {
459 left_dot
460 .cmp(right_dot)
461 .then_with(|| left_value.cmp(right_value))
462 });
463 let skip_count = visible.len().saturating_sub(ENTRY_DATA_MAX_LEN);
464 let visible = visible.into_iter().skip(skip_count).collect::<Vec<_>>();
465 let (data, dots): (Vec<_>, Vec<_>) = visible.into_iter().unzip();
466
467 Self {
468 did,
469 data,
470 kind,
471 crdt: EntryCrdt {
472 register,
473 dots,
474 tombstones: tombstones.into_iter().collect(),
475 },
476 }
477 }
478
479 fn materialize_topic_buffer(&self, buffer: DataTopicBuffer) -> Self {
480 Self::materialize_elements(
481 self.did,
482 self.kind,
483 buffer.register,
484 buffer.values,
485 BTreeSet::new(),
486 )
487 }
488
489 fn materialize_relay_set(&self, set: RelayMessageSet) -> Self {
490 Self::materialize_elements(
491 self.did,
492 self.kind,
493 set.adds.register,
494 set.adds.values,
495 set.removes,
496 )
497 }
498
499 fn join_subring_entry(&self, other: &Self) -> Result<Self> {
500 let members = self.subring_member_set()?.join(other.subring_member_set()?);
501 let mut subring: Subring = self.clone().try_into()?;
502 for member in members.iter().copied() {
503 subring.finger.join(member);
504 }
505 let mut entry: Entry = subring.try_into()?;
506 entry.crdt.register = self.crdt.register.max(other.crdt.register);
507 Ok(entry)
508 }
509
510 pub fn join(&self, other: Self) -> Result<Self> {
517 self.validate_same_carrier(&other)?;
518 match self.kind {
519 EntryKind::Data => {
520 Ok(self.materialize_topic_buffer(self.topic_buffer()?.join(other.topic_buffer()?)))
521 }
522 EntryKind::RelayMessage => {
523 Ok(self.materialize_relay_set(self.relay_set()?.join(other.relay_set()?)))
524 }
525 EntryKind::Subring => self.join_subring_entry(&other),
526 }
527 }
528
529 pub fn affine(&self, scalar: u16) -> Result<Vec<Entry>> {
531 Ok(self
532 .did
533 .rotate_affine(scalar)?
534 .into_iter()
535 .map(|did| self.clone_with_did(did))
536 .collect())
537 }
538
539 pub fn clone_with_did(&self, did: Did) -> Self {
541 let mut entry = self.clone();
542 entry.did = did;
543 entry
544 }
545
546 fn is_data_entry(&self) -> bool {
547 self.kind == EntryKind::Data
548 }
549
550 fn is_subring_entry(&self) -> bool {
551 self.kind == EntryKind::Subring
552 }
553
554 fn is_relay_entry(&self) -> bool {
555 self.kind == EntryKind::RelayMessage
556 }
557
558 fn same_kind_as(&self, other: &Self) -> bool {
559 self.kind == other.kind
560 }
561
562 fn same_key_as(&self, other: &Self) -> bool {
563 self.did == other.did
564 }
565
566 pub fn try_into_storage_entry(self) -> Result<Self> {
574 match self.kind {
575 EntryKind::Data => {
576 let buffer = self.topic_buffer()?;
577 Ok(self.materialize_topic_buffer(buffer))
578 }
579 EntryKind::RelayMessage => {
580 let set = self.relay_set()?;
581 Ok(self.materialize_relay_set(set))
582 }
583 EntryKind::Subring => Ok(self),
584 }
585 }
586
587 pub fn operate(&self, op: EntryOperation, actor: Did) -> Result<Self> {
590 match op {
591 EntryOperation::Overwrite(entry) => self.overwrite(entry, actor),
592 EntryOperation::Extend(entry) => self.extend(entry, actor),
593 EntryOperation::Touch(entry) => self.touch(entry, actor),
594 EntryOperation::JoinSubring(_, did) => self.join_subring(did),
595 EntryOperation::Tombstone(entry) => self.tombstone(entry),
596 }
597 }
598
599 pub fn overwrite(&self, other: Self, actor: Did) -> Result<Self> {
608 if !self.is_data_entry() {
609 return Err(Error::EntryNotOverwritable);
610 }
611 self.join(other.ensure_stamp_after(
612 actor,
613 self.max_observed_version(),
614 EntryStampKind::Overwrite,
615 )?)
616 }
617
618 pub fn extend(&self, other: Self, actor: Did) -> Result<Self> {
621 if !self.is_data_entry() {
622 return Err(Error::EntryNotAppendable);
623 }
624 self.join(other.ensure_stamp_after(
625 actor,
626 self.max_observed_version(),
627 EntryStampKind::Delta,
628 )?)
629 }
630
631 pub fn touch(&self, other: Self, actor: Did) -> Result<Self> {
635 if !self.is_data_entry() {
636 return Err(Error::EntryNotAppendable);
637 }
638 self.join(other.ensure_stamp_after(
639 actor,
640 self.max_observed_version(),
641 EntryStampKind::Delta,
642 )?)
643 }
644
645 pub fn join_subring(&self, did: Did) -> Result<Self> {
648 if !self.is_subring_entry() {
649 return Err(Error::EntryNotJoinable);
650 }
651
652 let mut subring: Subring = self.clone().try_into()?;
653 subring.finger.join(did);
654 let other: Entry = subring.try_into()?;
655 self.join(other)
656 }
657
658 pub fn tombstone(&self, other: Self) -> Result<Self> {
664 if !self.is_relay_entry() {
665 return Err(Error::EntryNotTombstonable);
666 }
667 self.validate_same_carrier(&other)?;
668
669 let mut set = self.relay_set()?;
670 let target_values = other.data.into_iter().collect::<BTreeSet<_>>();
671 let target_dots = other.crdt.dots.into_iter().collect::<BTreeSet<_>>();
672 let has_dot_witness = !target_dots.is_empty();
673
674 for (value, dot) in &set.adds.values {
675 if target_dots.contains(dot) || (!has_dot_witness && target_values.contains(value)) {
676 set.removes.insert(*dot);
677 }
678 }
679
680 Ok(self.materialize_relay_set(set))
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use num_bigint::BigUint;
687
688 use super::*;
689 use crate::algebra::assert_join_semilattice_laws;
690 use crate::algebra::assert_strong_eventual_consistency;
691 use crate::ecc::SecretKey;
692 use crate::message::Message;
693 use crate::session::SessionSk;
694
695 fn encoded(value: &str) -> Result<Encoded> {
696 value.to_string().encode()
697 }
698
699 fn data_entry(topic: &str, value: &str) -> Result<Entry> {
700 (topic.to_string(), encoded(value)?).try_into()
701 }
702
703 fn data_entry_from_values(topic: &str, values: Vec<String>) -> Result<Entry> {
704 let data = values
705 .into_iter()
706 .map(|value| value.encode())
707 .collect::<Result<Vec<_>>>()?;
708 Ok(Entry::new(Entry::gen_did(topic)?, data, EntryKind::Data))
709 }
710
711 fn overflowing_data_entry(topic: &str, overflow: usize) -> Result<(Entry, usize)> {
712 let incoming_count = ENTRY_DATA_MAX_LEN + overflow;
713 let entry = data_entry_from_values(
714 topic,
715 (0..incoming_count)
716 .map(|i| format!("incoming{i}"))
717 .collect::<Vec<_>>(),
718 )?;
719 Ok((entry, incoming_count))
720 }
721
722 fn decode_entry_data(entry: &Entry) -> Result<Vec<String>> {
723 entry
724 .data
725 .iter()
726 .map(|item| item.decode())
727 .collect::<Result<Vec<String>>>()
728 }
729
730 fn assert_entry_keeps_recent_overflow(
731 entry: &Entry,
732 incoming_count: usize,
733 overflow: usize,
734 ) -> Result<()> {
735 assert_eq!(entry.data.len(), ENTRY_DATA_MAX_LEN);
736 let decoded = decode_entry_data(entry)?;
737 assert_eq!(decoded.first(), Some(&format!("incoming{overflow}")));
738 assert_eq!(
739 decoded.last(),
740 Some(&format!("incoming{}", incoming_count - 1))
741 );
742 Ok(())
743 }
744
745 fn subring_entry(name: &str) -> Result<Entry> {
746 let creator = Entry::gen_did("creator")?;
747 Subring::new(name, creator)?.try_into()
748 }
749
750 fn actor() -> Did {
751 Did::from(42u32)
752 }
753
754 fn version(counter: u32) -> EntryVersion {
755 EntryVersion::new(
756 u128::from(counter),
757 Did::from(counter),
758 Did::from(counter.saturating_add(1000)),
759 )
760 }
761
762 fn data_delta(topic: &str, value: &str, counter: u32) -> Result<Entry> {
763 data_entry(topic, value)?.stamp_delta(version(counter))
764 }
765
766 fn overwrite_delta(topic: &str, value: &str, counter: u32) -> Result<Entry> {
767 data_entry(topic, value)?.stamp_overwrite(version(counter))
768 }
769
770 fn relay_delta(did: Did, value: &str, counter: u32) -> Result<Entry> {
771 Entry::new(did, vec![encoded(value)?], EntryKind::RelayMessage)
772 .stamp_delta(version(counter))
773 }
774
775 #[test]
776 fn gset_satisfies_join_semilattice_laws() {
777 let mut a = GSet::new();
778 a.insert(Did::from(1u32));
779 let mut b = GSet::new();
780 b.insert(Did::from(2u32));
781 let mut ab = GSet::new();
782 ab.insert(Did::from(1u32));
783 ab.insert(Did::from(2u32));
784
785 assert_join_semilattice_laws(&[GSet::new(), a, b, ab]);
786 }
787
788 #[test]
789 fn data_topic_buffer_satisfies_join_semilattice_laws() -> Result<()> {
790 let samples = [
791 Entry::new(Entry::gen_did("topic")?, vec![], EntryKind::Data).topic_buffer()?,
792 data_delta("topic", "a", 1)?.topic_buffer()?,
793 data_delta("topic", "b", 2)?.topic_buffer()?,
794 overwrite_delta("topic", "c", 3)?.topic_buffer()?,
795 ];
796
797 assert_join_semilattice_laws(&samples);
798 Ok(())
799 }
800
801 #[test]
802 fn relay_message_set_satisfies_join_semilattice_laws() -> Result<()> {
803 let did = Did::from(10u32);
804 let a = Entry::new(did, vec![encoded("a")?], EntryKind::RelayMessage)
805 .stamp_delta(version(1))?
806 .relay_set()?;
807 let b = Entry::new(did, vec![encoded("b")?], EntryKind::RelayMessage)
808 .stamp_delta(version(2))?
809 .relay_set()?;
810 let ab = Entry::new(did, vec![], EntryKind::RelayMessage)
811 .join(relay_delta(did, "a", 1)?)?
812 .join(relay_delta(did, "b", 2)?)?;
813 let tombstoned_a = ab.tombstone(relay_delta(did, "a", 1)?)?.relay_set()?;
814
815 assert_join_semilattice_laws(&[RelayMessageSet::default(), a, b, tombstoned_a]);
816 Ok(())
817 }
818
819 #[test]
820 fn entry_join_is_strongly_eventually_consistent_for_data_deltas() -> Result<()> {
821 let base = Entry::new(Entry::gen_did("topic")?, vec![], EntryKind::Data);
822 let deltas = vec![
823 data_delta("topic", "a", 1)?,
824 data_delta("topic", "b", 2)?,
825 data_delta("topic", "a", 3)?,
826 ];
827
828 let forward = deltas
829 .iter()
830 .cloned()
831 .try_fold(base.clone(), |acc, delta| acc.join(delta))?;
832 let reverse = deltas
833 .iter()
834 .rev()
835 .cloned()
836 .try_fold(base.clone(), |acc, delta| acc.join(delta))?;
837 let duplicated = deltas
838 .iter()
839 .cloned()
840 .chain(deltas.iter().cloned())
841 .try_fold(base, |acc, delta| acc.join(delta))?;
842
843 assert_eq!(forward, reverse);
844 assert_eq!(forward, duplicated);
845 assert_eq!(decode_entry_data(&forward)?, vec![
846 String::from("b"),
847 String::from("a")
848 ]);
849 Ok(())
850 }
851
852 #[test]
853 fn generic_sec_witness_accepts_data_topic_buffer_deltas() -> Result<()> {
854 let base = Entry::new(Entry::gen_did("topic")?, vec![], EntryKind::Data).topic_buffer()?;
855 let deltas = vec![
856 data_delta("topic", "a", 1)?.topic_buffer()?,
857 data_delta("topic", "b", 2)?.topic_buffer()?,
858 ];
859
860 assert_strong_eventual_consistency(base, &deltas);
861 Ok(())
862 }
863
864 #[test]
865 fn storage_normalization_uses_lattice_top_n_order() -> Result<()> {
866 let incoming_count = ENTRY_DATA_MAX_LEN + 3;
867 let mut entry = data_entry_from_values(
868 "topic",
869 (0..incoming_count)
870 .map(|i| format!("incoming{i}"))
871 .collect::<Vec<_>>(),
872 )?;
873 entry.crdt.dots = entry
874 .data
875 .iter()
876 .enumerate()
877 .map(|(index, _)| {
878 let counter = if index == 0 {
879 10_000
880 } else {
881 u32::try_from(index).map_err(|_| Error::EntryDotIndexOutOfBounds { index })?
882 };
883 EntryDot::for_index(version(counter), index)
884 })
885 .collect::<Result<Vec<_>>>()?;
886
887 let normalized = entry.try_into_storage_entry()?;
888 let decoded = decode_entry_data(&normalized)?;
889
890 assert_eq!(normalized.data.len(), ENTRY_DATA_MAX_LEN);
891 assert_eq!(normalized.data.len(), normalized.crdt.dots.len());
892 assert!(decoded.contains(&String::from("incoming0")));
893 assert!(!decoded.contains(&String::from("incoming1")));
894 assert!(!decoded.contains(&String::from("incoming2")));
895 assert!(!decoded.contains(&String::from("incoming3")));
896 Ok(())
897 }
898
899 #[test]
900 fn storage_normalization_realigns_legacy_mismatched_dots() -> Result<()> {
901 let mut entry = data_entry_from_values(
902 "topic",
903 (0..ENTRY_DATA_MAX_LEN + 2)
904 .map(|i| format!("legacy{i}"))
905 .collect::<Vec<_>>(),
906 )?;
907 entry.crdt.dots = vec![EntryDot::for_index(version(10_000), 0)?];
908
909 let normalized = entry.try_into_storage_entry()?;
910
911 assert_eq!(normalized.data.len(), ENTRY_DATA_MAX_LEN);
912 assert_eq!(normalized.data.len(), normalized.crdt.dots.len());
913 Ok(())
914 }
915
916 #[test]
917 fn crdt_constructors_normalize_carrier_invariants() -> Result<()> {
918 let register = version(10);
919 let stale = encoded("stale")?;
920 let live = encoded("live")?;
921 let mut values = BTreeMap::new();
922 values.insert(stale.clone(), EntryDot::for_index(version(1), 0)?);
923 let live_dot = EntryDot::for_index(version(11), 0)?;
924 values.insert(live.clone(), live_dot);
925
926 let buffer = DataTopicBuffer::new(Some(register), values);
927 assert_eq!(buffer.values.len(), 1);
928 assert!(buffer.values.contains_key(&live));
929
930 let relay = RelayMessageSet::new(buffer, BTreeSet::from([live_dot]));
931 assert!(relay.adds.values.is_empty());
932 assert!(relay.removes.contains(&live_dot));
933 Ok(())
934 }
935
936 #[test]
937 fn overwrite_register_tiebreaker_converges_for_same_timestamp_actor() -> Result<()> {
938 let did = Entry::gen_did("topic")?;
939 let issuer = actor();
940 let lower = Entry::new(did, vec![encoded("lower")?], EntryKind::Data)
941 .stamp_overwrite(EntryVersion::new(1, issuer, Did::from(1u32)))?;
942 let higher = Entry::new(did, vec![encoded("higher")?], EntryKind::Data)
943 .stamp_overwrite(EntryVersion::new(1, issuer, Did::from(2u32)))?;
944 let base = Entry::new(did, vec![], EntryKind::Data);
945
946 let forward = base.clone().join(lower.clone())?.join(higher.clone())?;
947 let reverse = base.join(higher)?.join(lower)?;
948
949 assert_eq!(forward, reverse);
950 assert_eq!(decode_entry_data(&forward)?, vec![String::from("higher")]);
951 Ok(())
952 }
953
954 #[test]
955 fn operation_digest_hashes_canonical_bytes_not_legacy_base58() -> Result<()> {
956 let entry = data_entry("topic", "value")?;
957 let digest = OperationDigest {
958 kind: entry.kind,
959 did: entry.did,
960 data: &entry.data,
961 };
962 let bytes = bincode::serialize(&digest).map_err(Error::BincodeSerialize)?;
963
964 let direct = Did::try_from(HashStr::from_bytes(&bytes))?;
965 let legacy_encoded = bytes.encode()?;
966 let legacy_base58 = Entry::gen_did(legacy_encoded.value())?;
967
968 assert_eq!(entry.operation_digest()?, direct);
969 assert_ne!(direct, legacy_base58);
970 Ok(())
971 }
972
973 #[test]
974 fn forwarded_overwrite_witness_is_not_reissued_after_local_floor() -> Result<()> {
975 let current = overwrite_delta("topic", "current", 10)?;
976 let stale_forwarded = overwrite_delta("topic", "stale", 1)?;
977
978 let updated = current.overwrite(stale_forwarded, actor())?;
979
980 assert_eq!(decode_entry_data(&updated)?, vec![String::from("current")]);
981 Ok(())
982 }
983
984 #[test]
985 fn overwrite_replaces_data_for_same_data_entry() -> Result<()> {
986 let entry = data_entry("topic", "old")?;
987 let other = data_entry("topic", "new")?;
988 let updated = entry.overwrite(other, actor())?;
989 assert_eq!(decode_entry_data(&updated)?, vec![String::from("new")]);
990 Ok(())
991 }
992
993 #[test]
994 fn overwrite_rejects_non_data_entry() -> Result<()> {
995 let entry = subring_entry("ring")?;
996 let other = entry.clone();
997
998 assert!(matches!(
999 entry.overwrite(other, actor()),
1000 Err(Error::EntryNotOverwritable)
1001 ));
1002 Ok(())
1003 }
1004
1005 #[test]
1006 fn overwrite_rejects_kind_mismatch() -> Result<()> {
1007 let entry = data_entry("topic", "old")?;
1008 let mut other = entry.clone();
1009 other.kind = EntryKind::RelayMessage;
1010
1011 assert!(matches!(
1012 entry.overwrite(other, actor()),
1013 Err(Error::EntryKindNotEqual)
1014 ));
1015 Ok(())
1016 }
1017
1018 #[test]
1019 fn overwrite_rejects_key_mismatch() -> Result<()> {
1020 let entry = data_entry("topic-a", "old")?;
1021 let other = data_entry("topic-b", "new")?;
1022
1023 assert!(matches!(
1024 entry.overwrite(other, actor()),
1025 Err(Error::EntryDidNotEqual)
1026 ));
1027 Ok(())
1028 }
1029
1030 #[test]
1031 fn overwrite_caps_payloads_larger_than_max_len() -> Result<()> {
1032 let overflow = 3;
1033 let (incoming, incoming_count) = overflowing_data_entry("topic", overflow)?;
1034 let entry = data_entry("topic", "base")?;
1035 let updated = entry.overwrite(incoming, actor())?;
1036 assert_entry_keeps_recent_overflow(&updated, incoming_count, overflow)
1037 }
1038
1039 #[test]
1040 fn extend_appends_data_for_same_entry() -> Result<()> {
1041 let entry = data_entry("topic", "first")?;
1042 let other = data_entry("topic", "second")?;
1043 let updated = entry.extend(other, actor())?;
1044 assert_eq!(decode_entry_data(&updated)?, vec![
1045 String::from("first"),
1046 String::from("second")
1047 ]);
1048 Ok(())
1049 }
1050
1051 #[test]
1052 fn extend_trims_oldest_items_at_max_len() -> Result<()> {
1053 let mut entry = data_entry("topic", "test0")?;
1054 for i in 1..ENTRY_DATA_MAX_LEN {
1055 let data = format!("test{i}");
1056 let other = data_entry("topic", &data)?;
1057 entry = entry.extend(other, actor())?;
1058 assert_eq!(entry.data.len(), i + 1);
1059 }
1060
1061 for i in ENTRY_DATA_MAX_LEN..ENTRY_DATA_MAX_LEN + 10 {
1062 let data = format!("test{i}");
1063 let other = data_entry("topic", &data)?;
1064 entry = entry.extend(other, actor())?;
1065 assert_eq!(entry.data.len(), ENTRY_DATA_MAX_LEN);
1066 let decoded = decode_entry_data(&entry)?;
1067 assert_eq!(
1068 decoded.first(),
1069 Some(&format!("test{}", i - ENTRY_DATA_MAX_LEN + 1))
1070 );
1071 assert_eq!(decoded.last(), Some(&data));
1072 }
1073 Ok(())
1074 }
1075
1076 #[test]
1077 fn extend_caps_incoming_payloads_larger_than_max_len() -> Result<()> {
1078 let overflow = 3;
1079 let (incoming, incoming_count) = overflowing_data_entry("topic", overflow)?;
1080 let entry = data_entry("topic", "base")?;
1081 let updated = entry.extend(incoming, actor())?;
1082 assert_entry_keeps_recent_overflow(&updated, incoming_count, overflow)
1083 }
1084
1085 #[test]
1086 fn extend_rejects_non_data_entry() -> Result<()> {
1087 let entry = subring_entry("ring")?;
1088 let other = entry.clone();
1089
1090 assert!(matches!(
1091 entry.extend(other, actor()),
1092 Err(Error::EntryNotAppendable)
1093 ));
1094 Ok(())
1095 }
1096
1097 #[test]
1098 fn touch_moves_existing_items_to_end_once() -> Result<()> {
1099 let entry = data_entry("topic", "a")?
1100 .extend(data_entry("topic", "b")?, actor())?
1101 .extend(data_entry("topic", "c")?, actor())?;
1102 let touched = data_entry("topic", "b")?;
1103 let updated = entry.touch(touched, actor())?;
1104 assert_eq!(decode_entry_data(&updated)?, vec![
1105 String::from("a"),
1106 String::from("c"),
1107 String::from("b")
1108 ]);
1109 Ok(())
1110 }
1111
1112 #[test]
1113 fn touch_trims_oldest_non_touched_items_at_max_len() -> Result<()> {
1114 let mut entry = data_entry("topic", "test0")?;
1115 for i in 1..ENTRY_DATA_MAX_LEN {
1116 entry = entry.extend(data_entry("topic", &format!("test{i}"))?, actor())?;
1117 }
1118 let updated = entry.touch(data_entry("topic", "test0")?, actor())?;
1119 assert_eq!(updated.data.len(), ENTRY_DATA_MAX_LEN);
1120 let decoded = decode_entry_data(&updated)?;
1121 assert_eq!(decoded.first(), Some(&String::from("test1")));
1122 assert_eq!(decoded.last(), Some(&String::from("test0")));
1123 Ok(())
1124 }
1125
1126 #[test]
1127 fn relay_tombstone_removes_observed_message_by_join() -> Result<()> {
1128 let did = Did::from(30u32);
1129 let first = relay_delta(did, "first", 1)?;
1130 let second = relay_delta(did, "second", 2)?;
1131 let carrier = Entry::new(did, vec![], EntryKind::RelayMessage)
1132 .join(first.clone())?
1133 .join(second.clone())?;
1134
1135 let removed = carrier.tombstone(first.clone())?;
1136
1137 assert_eq!(decode_entry_data(&removed)?, vec![String::from("second")]);
1138 let joined_with_stale_add = removed.join(first)?;
1139 assert_eq!(decode_entry_data(&joined_with_stale_add)?, vec![
1140 String::from("second")
1141 ]);
1142 Ok(())
1143 }
1144
1145 #[test]
1146 fn relay_tombstone_rejects_non_relay_entry() -> Result<()> {
1147 let entry = data_entry("topic", "value")?;
1148 let other = entry.clone();
1149
1150 assert!(matches!(
1151 entry.tombstone(other),
1152 Err(Error::EntryNotTombstonable)
1153 ));
1154 Ok(())
1155 }
1156
1157 #[test]
1158 fn touch_caps_incoming_payloads_larger_than_max_len() -> Result<()> {
1159 let overflow = 3;
1160 let (incoming, incoming_count) = overflowing_data_entry("topic", overflow)?;
1161 let entry = data_entry("topic", "base")?;
1162 let updated = entry.touch(incoming, actor())?;
1163 assert_entry_keeps_recent_overflow(&updated, incoming_count, overflow)
1164 }
1165
1166 #[test]
1167 fn join_subring_adds_member_to_subring_entry() -> Result<()> {
1168 let entry = subring_entry("ring")?;
1169 let member = Entry::gen_did("member")?;
1170 let updated = entry.join_subring(member)?;
1171 let subring = Subring::try_from(updated)?;
1172 assert_eq!(subring.finger.first(), Some(member));
1173 Ok(())
1174 }
1175
1176 #[test]
1177 fn join_subring_rejects_non_subring_entry() -> Result<()> {
1178 let entry = data_entry("topic", "value")?;
1179 let member = Entry::gen_did("member")?;
1180
1181 assert!(matches!(
1182 entry.join_subring(member),
1183 Err(Error::EntryNotJoinable)
1184 ));
1185 Ok(())
1186 }
1187
1188 #[test]
1189 fn operation_default_entry_matches_operation_kind() -> Result<()> {
1190 let target = data_entry("topic", "value")?;
1191 let default = EntryOperation::Extend(target.clone()).gen_default_entry()?;
1192 assert_eq!(default.did, target.did);
1193 assert_eq!(default.kind, EntryKind::Data);
1194 assert!(default.data.is_empty());
1195 Ok(())
1196 }
1197
1198 #[test]
1199 fn message_payload_entry_key_targets_successor_of_signer() -> Result<()> {
1200 let key = SecretKey::random();
1201 let session = SessionSk::new_with_seckey(&key)?;
1202 let signer: Did = key.address().into();
1203 let payload =
1204 MessagePayload::new_send(Message::custom(b"relay")?, &session, signer, signer)?;
1205 let entry = Entry::try_from(payload)?;
1206 let expected = BigUint::from(signer) + BigUint::from(1u16);
1207 assert_eq!(entry.did, expected.into());
1208 assert_eq!(entry.kind, EntryKind::RelayMessage);
1209 Ok(())
1210 }
1211
1212 #[test]
1213 fn affine_preserves_payload_and_kind_while_rotating_keys() -> Result<()> {
1214 let entry = data_entry("topic", "value")?;
1215 let affined = entry.affine(3)?;
1216 assert_eq!(affined.len(), 3);
1217 for rotated in affined {
1218 assert_eq!(rotated.data, entry.data);
1219 assert_eq!(rotated.kind, entry.kind);
1220 }
1221 Ok(())
1222 }
1223}