Skip to main content

rings_core/dht/
entry.rs

1#![warn(missing_docs)]
2use std::collections::BTreeMap;
3use std::collections::BTreeSet;
4use std::str::FromStr;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use super::subring::Subring;
10use crate::algebra::JoinSemilattice;
11use crate::consts::ENTRY_DATA_MAX_LEN;
12use crate::dht::Did;
13use crate::ecc::HashStr;
14use crate::error::Error;
15use crate::error::Result;
16use crate::message::Encoded;
17use crate::message::Encoder;
18use crate::message::MessagePayload;
19use crate::message::MessageVerificationExt;
20
21mod crdt;
22
23pub use crdt::DataTopicBuffer;
24pub use crdt::EntryCrdt;
25pub use crdt::EntryDot;
26pub use crdt::EntryVersion;
27pub use crdt::GSet;
28pub use crdt::RelayMessageSet;
29pub use crdt::SubringMemberSet;
30
31/// DHT storage entry categories.
32#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
33pub enum EntryKind {
34    /// Encoded data stored in DHT
35    Data,
36    /// Finger table of a Subring
37    Subring,
38    /// A relayed but unreached message, which should be stored on
39    /// the successor of the destination Did.
40    RelayMessage,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44enum EntryStampKind {
45    Overwrite,
46    Delta,
47}
48
49// Canonical stamp input for EntryVersion.operation.
50//
51// This digest is an unreleased CRDT tie-break witness between nodes running the
52// same code, not a stable storage key or cross-version protocol identifier.
53#[derive(Serialize)]
54struct OperationDigest<'a> {
55    kind: EntryKind,
56    did: Did,
57    data: &'a [Encoded],
58}
59
60/// Operations supported by a DHT storage entry.
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EntryOperation {
63    /// Create or update an [`Entry`].
64    Overwrite(Entry),
65    /// Extend data to a Data kind [`Entry`].
66    /// This operation will create an [`Entry`] if it does not exist.
67    Extend(Entry),
68    /// Extend data to a Data kind [`Entry`] uniquely.
69    /// If any element is already existed, move it to the end of the data vector.
70    /// This operation will create an [`Entry`] if it does not exist.
71    Touch(Entry),
72    /// Join subring.
73    JoinSubring(String, Did),
74    /// Tombstone observed relay-message payloads in a two-phase set.
75    ///
76    /// The payload identifies the relay-message carrier and the values to
77    /// remove. If CRDT dots are present, those dots are the remove witnesses;
78    /// otherwise the receiver tombstones currently observed dots with matching
79    /// payload bytes.
80    Tombstone(Entry),
81}
82
83/// A DHT storage entry with an [`EntryKind`] and a ring key represented as [`Did`].
84///
85/// An [`Entry`] is data stored by [`ChordStorage`](super::ChordStorage). It is not a
86/// Chord node and does not participate in successor, predecessor, or finger-table
87/// membership.
88///
89/// The [`Did`] of an [`Entry`] is in the following format:
90/// * If kind value is [EntryKind::Data], it's sha1 of data topic.
91/// * If kind value is [EntryKind::Subring], it's sha1 of Subring name.
92/// * If kind value is [EntryKind::RelayMessage], it's the destination Did of
93///   message plus 1 (to ensure that the message is sent to the successor of destination),
94///   thus while destination node going online, it will sync message from its successor.
95#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
96pub struct Entry {
97    /// The ring key of this entry. It has the same representation as a node DID, but a
98    /// different domain meaning.
99    pub did: Did,
100    /// The data entity of `Entry`, encoded by [Encoder].
101    pub data: Vec<Encoded>,
102    /// The type indicates how the data is encoded and how the Did is generated.
103    pub kind: EntryKind,
104    /// CRDT metadata that makes replicated merge a join-semilattice operation.
105    #[serde(default)]
106    pub crdt: EntryCrdt,
107}
108
109/// An [`Entry`] paired with its Chord placement key.
110///
111/// `key` is the DHT storage location. `entry.did` is the resource identity. These two
112/// values may differ for redundant replicas.
113#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
114pub struct PlacedEntry {
115    /// The key used to place this value in DHT storage.
116    pub key: Did,
117    /// The stored entry value.
118    pub entry: Entry,
119}
120
121impl PlacedEntry {
122    /// Pair an entry value with the key where it is stored.
123    pub fn new(key: Did, entry: Entry) -> Self {
124        Self { key, entry }
125    }
126}
127
128/// Durable-storage acknowledgement for an entry hand-off delta.
129///
130/// `key` is the placement key updated by the receiver. `entry` is the copied
131/// delta that the receiver joined into its local least upper bound. The sender
132/// compares the storage-normalized ack value with its current local value
133/// before deleting; if the sender has observed any newer durable delta
134/// meanwhile, deletion is skipped.
135#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
136pub struct SyncedEntryAck {
137    /// The placement key durably persisted by the sync receiver.
138    pub key: Did,
139    /// The exact value durably persisted by the sync receiver.
140    pub entry: Entry,
141}
142
143impl SyncedEntryAck {
144    /// Witness that `entry` was durably joined at `key`.
145    pub fn new(key: Did, entry: Entry) -> Self {
146        Self { key, entry }
147    }
148
149    /// Returns whether this ack proves that `local` equals the copied value.
150    ///
151    /// Post: comparison is performed on storage canonical forms, so legacy
152    /// entries without dots compare equal to the normalized value durably
153    /// persisted by the receiver.
154    pub fn confirms_local_value(&self, local: &Entry) -> Result<bool> {
155        Ok(self.entry.clone().try_into_storage_entry()?
156            == local.clone().try_into_storage_entry()?)
157    }
158}
159
160/// A lookup request for a concrete placement of an entry identity.
161///
162/// `resource` is `id(e)`. `placement` is one element of
163/// `place(resource, REDUNDANT)`.
164#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
165pub struct EntryLookupKey {
166    /// Entry identity being searched.
167    pub resource: Did,
168    /// Placement key being interrogated.
169    pub placement: Did,
170}
171
172impl EntryLookupKey {
173    /// Pair an entry identity with one of its placement keys.
174    pub fn new(resource: Did, placement: Did) -> Self {
175        Self {
176            resource,
177            placement,
178        }
179    }
180}
181
182/// A placement key observed missing during lookup.
183#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
184pub struct PlacementMiss {
185    /// Placement key whose responsible owner returned `None`.
186    pub key: Did,
187    /// Owner that was responsible for `key` when the miss was observed.
188    pub owner: Did,
189}
190
191impl PlacementMiss {
192    /// Witness that `owner` was queried for `key` and did not have the entry.
193    pub fn new(key: Did, owner: Did) -> Self {
194        Self { key, owner }
195    }
196}
197
198/// A successful lookup result plus the missing placements observed before it.
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct EntryLookupEvidence {
201    /// Entry found by the lookup.
202    pub entry: Entry,
203    /// Placement misses observed as part of the same lookup.
204    pub misses: Vec<PlacementMiss>,
205}
206
207impl EntryLookupEvidence {
208    /// Construct lookup evidence.
209    pub fn new(entry: Entry, misses: Vec<PlacementMiss>) -> Self {
210        Self { entry, misses }
211    }
212}
213
214impl Entry {
215    /// Construct an entry with empty CRDT metadata.
216    pub fn new(did: Did, data: Vec<Encoded>, kind: EntryKind) -> Self {
217        Self {
218            did,
219            data,
220            kind,
221            crdt: EntryCrdt::default(),
222        }
223    }
224
225    /// Generate did from topic.
226    pub fn gen_did(topic: &str) -> Result<Did> {
227        let hash: HashStr = topic.into();
228        let did = Did::from_str(&hash.inner());
229        tracing::debug!("gen_did: topic: {}, did: {:?}", topic, did);
230        did
231    }
232}
233
234impl EntryOperation {
235    /// Return this operation with CRDT versions assigned at the operation boundary.
236    ///
237    /// Existing CRDT witnesses are preserved so forwarded operations keep the
238    /// origin's dot/version instead of being reissued by every routing hop.
239    pub fn stamped(self, actor: Did) -> Result<Self> {
240        Ok(match self {
241            EntryOperation::Overwrite(entry) => EntryOperation::Overwrite(
242                entry.ensure_stamp_after(actor, None, EntryStampKind::Overwrite)?,
243            ),
244            EntryOperation::Extend(entry) => EntryOperation::Extend(entry.ensure_stamp_after(
245                actor,
246                None,
247                EntryStampKind::Delta,
248            )?),
249            EntryOperation::Touch(entry) => EntryOperation::Touch(entry.ensure_stamp_after(
250                actor,
251                None,
252                EntryStampKind::Delta,
253            )?),
254            EntryOperation::JoinSubring(name, did) => EntryOperation::JoinSubring(name, did),
255            EntryOperation::Tombstone(entry) => EntryOperation::Tombstone(entry),
256        })
257    }
258
259    /// Extract the did of target Entry.
260    pub fn did(&self) -> Result<Did> {
261        Ok(match self {
262            EntryOperation::Overwrite(entry) => entry.did,
263            EntryOperation::Extend(entry) => entry.did,
264            EntryOperation::Touch(entry) => entry.did,
265            EntryOperation::JoinSubring(name, _) => Entry::gen_did(name)?,
266            EntryOperation::Tombstone(entry) => entry.did,
267        })
268    }
269
270    /// Extract the kind of target Entry.
271    pub fn kind(&self) -> EntryKind {
272        match self {
273            EntryOperation::Overwrite(entry) => entry.kind,
274            EntryOperation::Extend(entry) => entry.kind,
275            EntryOperation::Touch(entry) => entry.kind,
276            EntryOperation::JoinSubring(..) => EntryKind::Subring,
277            EntryOperation::Tombstone(entry) => entry.kind,
278        }
279    }
280
281    /// Generate a target Entry when it is not existed.
282    pub fn gen_default_entry(self) -> Result<Entry> {
283        match self {
284            EntryOperation::JoinSubring(name, did) => Subring::new(&name, did)?.try_into(),
285            _ => Ok(Entry::new(self.did()?, vec![], self.kind())),
286        }
287    }
288}
289
290impl TryFrom<MessagePayload> for Entry {
291    type Error = Error;
292    fn try_from(msg: MessagePayload) -> Result<Self> {
293        // Relay entries target the signer's successor on R = Z / 2^160, so the
294        // `+ 1` intentionally wraps in the fixed-width DID ring.
295        let did = msg.signer() + Did::from(1u32);
296        let data = msg.encode()?;
297        Ok(Self {
298            did,
299            data: vec![data],
300            kind: EntryKind::RelayMessage,
301            crdt: EntryCrdt::default(),
302        })
303    }
304}
305
306impl TryFrom<(String, Encoded)> for Entry {
307    type Error = Error;
308    fn try_from((topic, e): (String, Encoded)) -> Result<Self> {
309        Ok(Self {
310            did: Self::gen_did(&topic)?,
311            data: vec![e],
312            kind: EntryKind::Data,
313            crdt: EntryCrdt::default(),
314        })
315    }
316}
317
318impl TryFrom<(String, String)> for Entry {
319    type Error = Error;
320    fn try_from((topic, s): (String, String)) -> Result<Self> {
321        let encoded_message = s.encode()?;
322        (topic, encoded_message).try_into()
323    }
324}
325
326impl TryFrom<String> for Entry {
327    type Error = Error;
328    fn try_from(topic: String) -> Result<Self> {
329        (topic.clone(), topic).try_into()
330    }
331}
332
333impl Entry {
334    fn with_element_dots(mut self, version: EntryVersion) -> Result<Self> {
335        self.crdt.dots = self
336            .data
337            .iter()
338            .enumerate()
339            .map(|(index, _)| EntryDot::for_index(version, index))
340            .collect::<Result<Vec<_>>>()?;
341        Ok(self)
342    }
343
344    fn stamp_overwrite(mut self, version: EntryVersion) -> Result<Self> {
345        self.crdt.register = Some(version);
346        self.with_element_dots(version)
347    }
348
349    fn stamp_delta(self, version: EntryVersion) -> Result<Self> {
350        self.with_element_dots(version)
351    }
352
353    fn stamp(self, version: EntryVersion, kind: EntryStampKind) -> Result<Self> {
354        match kind {
355            EntryStampKind::Overwrite => self.stamp_overwrite(version),
356            EntryStampKind::Delta => self.stamp_delta(version),
357        }
358    }
359
360    fn operation_digest(&self) -> Result<Did> {
361        let digest = OperationDigest {
362            kind: self.kind,
363            did: self.did,
364            data: &self.data,
365        };
366        let bytes = bincode::serialize(&digest).map_err(Error::BincodeSerialize)?;
367        Did::try_from(HashStr::from_bytes(&bytes))
368    }
369
370    fn issue_version_after(&self, actor: Did, floor: Option<EntryVersion>) -> Result<EntryVersion> {
371        Ok(EntryVersion::issued_by(actor, self.operation_digest()?).after(floor))
372    }
373
374    fn ensure_stamp_after(
375        self,
376        actor: Did,
377        floor: Option<EntryVersion>,
378        kind: EntryStampKind,
379    ) -> Result<Self> {
380        if self.crdt.has_write_witness() {
381            return Ok(self);
382        }
383        let version = self.issue_version_after(actor, floor)?;
384        self.stamp(version, kind)
385    }
386
387    fn max_observed_version(&self) -> Option<EntryVersion> {
388        self.crdt
389            .dots
390            .iter()
391            .map(|dot| dot.version)
392            .chain(self.crdt.tombstones.iter().map(|dot| dot.version))
393            .chain(self.crdt.register)
394            .max()
395    }
396
397    fn validate_same_carrier(&self, other: &Self) -> Result<()> {
398        if !self.same_kind_as(other) {
399            return Err(Error::EntryKindNotEqual);
400        }
401        if !self.same_key_as(other) {
402            return Err(Error::EntryDidNotEqual);
403        }
404        Ok(())
405    }
406
407    fn dot_for_element(&self, index: usize) -> Result<EntryDot> {
408        if let Some(dot) = self.crdt.dots.get(index).copied() {
409            return Ok(dot);
410        }
411        EntryDot::for_index(self.crdt.legacy_floor(), index)
412    }
413
414    fn topic_buffer(&self) -> Result<DataTopicBuffer> {
415        let mut values = BTreeMap::new();
416        for (index, value) in self.data.iter().cloned().enumerate() {
417            let dot = self.dot_for_element(index)?;
418            values
419                .entry(value)
420                .and_modify(|current: &mut EntryDot| {
421                    *current = (*current).max(dot);
422                })
423                .or_insert(dot);
424        }
425        Ok(DataTopicBuffer::new(self.crdt.register, values))
426    }
427
428    fn relay_set(&self) -> Result<RelayMessageSet> {
429        Ok(RelayMessageSet::new(
430            self.topic_buffer()?,
431            self.crdt.tombstones.iter().copied().collect(),
432        ))
433    }
434
435    fn subring_member_set(&self) -> Result<SubringMemberSet> {
436        let subring: Subring = self.clone().try_into()?;
437        let mut members = SubringMemberSet::new();
438        for member in subring.finger.list().iter().flatten().copied() {
439            members.insert(member);
440        }
441        Ok(members)
442    }
443
444    fn materialize_elements(
445        did: Did,
446        kind: EntryKind,
447        register: Option<EntryVersion>,
448        elements: impl IntoIterator<Item = (Encoded, EntryDot)>,
449        tombstones: BTreeSet<EntryDot>,
450    ) -> Self {
451        let mut visible = elements
452            .into_iter()
453            .filter(|(_, dot)| {
454                let visible_after_reset = register.is_none_or(|floor| dot.version >= floor);
455                visible_after_reset && !tombstones.contains(dot)
456            })
457            .collect::<Vec<_>>();
458        visible.sort_by(|(left_value, left_dot), (right_value, right_dot)| {
459            left_dot
460                .cmp(right_dot)
461                .then_with(|| left_value.cmp(right_value))
462        });
463        let skip_count = visible.len().saturating_sub(ENTRY_DATA_MAX_LEN);
464        let visible = visible.into_iter().skip(skip_count).collect::<Vec<_>>();
465        let (data, dots): (Vec<_>, Vec<_>) = visible.into_iter().unzip();
466
467        Self {
468            did,
469            data,
470            kind,
471            crdt: EntryCrdt {
472                register,
473                dots,
474                tombstones: tombstones.into_iter().collect(),
475            },
476        }
477    }
478
479    fn materialize_topic_buffer(&self, buffer: DataTopicBuffer) -> Self {
480        Self::materialize_elements(
481            self.did,
482            self.kind,
483            buffer.register,
484            buffer.values,
485            BTreeSet::new(),
486        )
487    }
488
489    fn materialize_relay_set(&self, set: RelayMessageSet) -> Self {
490        Self::materialize_elements(
491            self.did,
492            self.kind,
493            set.adds.register,
494            set.adds.values,
495            set.removes,
496        )
497    }
498
499    fn join_subring_entry(&self, other: &Self) -> Result<Self> {
500        let members = self.subring_member_set()?.join(other.subring_member_set()?);
501        let mut subring: Subring = self.clone().try_into()?;
502        for member in members.iter().copied() {
503            subring.finger.join(member);
504        }
505        let mut entry: Entry = subring.try_into()?;
506        entry.crdt.register = self.crdt.register.max(other.crdt.register);
507        Ok(entry)
508    }
509
510    /// Merge two entries from the same replicated carrier.
511    ///
512    /// Law: for a fixed `(did, kind)` carrier, this is the state-based CRDT
513    /// join. Data entries are bounded LWW element sets with an LWW overwrite
514    /// register; subring entries are grow-only member sets; relay entries are
515    /// two-phase sets whose remove side is carried by tombstones.
516    pub fn join(&self, other: Self) -> Result<Self> {
517        self.validate_same_carrier(&other)?;
518        match self.kind {
519            EntryKind::Data => {
520                Ok(self.materialize_topic_buffer(self.topic_buffer()?.join(other.topic_buffer()?)))
521            }
522            EntryKind::RelayMessage => {
523                Ok(self.materialize_relay_set(self.relay_set()?.join(other.relay_set()?)))
524            }
525            EntryKind::Subring => self.join_subring_entry(&other),
526        }
527    }
528
529    /// Affine Transport entry to a list of affined did
530    pub fn affine(&self, scalar: u16) -> Result<Vec<Entry>> {
531        Ok(self
532            .did
533            .rotate_affine(scalar)?
534            .into_iter()
535            .map(|did| self.clone_with_did(did))
536            .collect())
537    }
538
539    /// Clone and setup with new DID
540    pub fn clone_with_did(&self, did: Did) -> Self {
541        let mut entry = self.clone();
542        entry.did = did;
543        entry
544    }
545
546    fn is_data_entry(&self) -> bool {
547        self.kind == EntryKind::Data
548    }
549
550    fn is_subring_entry(&self) -> bool {
551        self.kind == EntryKind::Subring
552    }
553
554    fn is_relay_entry(&self) -> bool {
555        self.kind == EntryKind::RelayMessage
556    }
557
558    fn same_kind_as(&self, other: &Self) -> bool {
559        self.kind == other.kind
560    }
561
562    fn same_key_as(&self, other: &Self) -> bool {
563        self.did == other.did
564    }
565
566    /// Normalize an entry immediately before it is persisted.
567    ///
568    /// Post: normalization uses the same carrier materialization as
569    /// [`Self::join`]; there is no second cap strategy outside the CRDT.
570    /// Post: `result.data.len() <= ENTRY_DATA_MAX_LEN`.
571    /// Post: `result.data.len() == result.crdt.dots.len()` for Data and
572    /// RelayMessage entries.
573    pub fn try_into_storage_entry(self) -> Result<Self> {
574        match self.kind {
575            EntryKind::Data => {
576                let buffer = self.topic_buffer()?;
577                Ok(self.materialize_topic_buffer(buffer))
578            }
579            EntryKind::RelayMessage => {
580                let set = self.relay_set()?;
581                Ok(self.materialize_relay_set(set))
582            }
583            EntryKind::Subring => Ok(self),
584        }
585    }
586
587    /// The entry point of [EntryOperation].
588    /// Will dispatch to different operation handlers according to the variant.
589    pub fn operate(&self, op: EntryOperation, actor: Did) -> Result<Self> {
590        match op {
591            EntryOperation::Overwrite(entry) => self.overwrite(entry, actor),
592            EntryOperation::Extend(entry) => self.extend(entry, actor),
593            EntryOperation::Touch(entry) => self.touch(entry, actor),
594            EntryOperation::JoinSubring(_, did) => self.join_subring(did),
595            EntryOperation::Tombstone(entry) => self.tombstone(entry),
596        }
597    }
598
599    /// Overwrite current data with new data.
600    ///
601    /// Preservation: the replacement is represented as a CRDT join. A newly
602    /// stamped overwrite carries a reset floor, and materialization keeps only
603    /// dots at or after that floor, so older payload dots are removed without a
604    /// non-monotone assignment.
605    ///
606    /// The handler of [EntryOperation::Overwrite].
607    pub fn overwrite(&self, other: Self, actor: Did) -> Result<Self> {
608        if !self.is_data_entry() {
609            return Err(Error::EntryNotOverwritable);
610        }
611        self.join(other.ensure_stamp_after(
612            actor,
613            self.max_observed_version(),
614            EntryStampKind::Overwrite,
615        )?)
616    }
617
618    /// This method is used to extend data to a Data kind [`Entry`].
619    /// The handler of [EntryOperation::Extend].
620    pub fn extend(&self, other: Self, actor: Did) -> Result<Self> {
621        if !self.is_data_entry() {
622            return Err(Error::EntryNotAppendable);
623        }
624        self.join(other.ensure_stamp_after(
625            actor,
626            self.max_observed_version(),
627            EntryStampKind::Delta,
628        )?)
629    }
630
631    /// This method is used to extend data to a Data kind [`Entry`] uniquely.
632    /// If any element is already existed, move it to the end of the data vector.
633    /// The handler of [EntryOperation::Touch].
634    pub fn touch(&self, other: Self, actor: Did) -> Result<Self> {
635        if !self.is_data_entry() {
636            return Err(Error::EntryNotAppendable);
637        }
638        self.join(other.ensure_stamp_after(
639            actor,
640            self.max_observed_version(),
641            EntryStampKind::Delta,
642        )?)
643    }
644
645    /// This method is used to join a subring.
646    /// The handler of [EntryOperation::JoinSubring].
647    pub fn join_subring(&self, did: Did) -> Result<Self> {
648        if !self.is_subring_entry() {
649            return Err(Error::EntryNotJoinable);
650        }
651
652        let mut subring: Subring = self.clone().try_into()?;
653        subring.finger.join(did);
654        let other: Entry = subring.try_into()?;
655        self.join(other)
656    }
657
658    /// Tombstone observed relay messages.
659    ///
660    /// Pre: `self` and `other` are the same relay-message carrier.
661    /// Post: every removed payload is represented by an add-dot tombstone, so
662    /// future joins with stale add replicas cannot resurrect it.
663    pub fn tombstone(&self, other: Self) -> Result<Self> {
664        if !self.is_relay_entry() {
665            return Err(Error::EntryNotTombstonable);
666        }
667        self.validate_same_carrier(&other)?;
668
669        let mut set = self.relay_set()?;
670        let target_values = other.data.into_iter().collect::<BTreeSet<_>>();
671        let target_dots = other.crdt.dots.into_iter().collect::<BTreeSet<_>>();
672        let has_dot_witness = !target_dots.is_empty();
673
674        for (value, dot) in &set.adds.values {
675            if target_dots.contains(dot) || (!has_dot_witness && target_values.contains(value)) {
676                set.removes.insert(*dot);
677            }
678        }
679
680        Ok(self.materialize_relay_set(set))
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use num_bigint::BigUint;
687
688    use super::*;
689    use crate::algebra::assert_join_semilattice_laws;
690    use crate::algebra::assert_strong_eventual_consistency;
691    use crate::ecc::SecretKey;
692    use crate::message::Message;
693    use crate::session::SessionSk;
694
695    fn encoded(value: &str) -> Result<Encoded> {
696        value.to_string().encode()
697    }
698
699    fn data_entry(topic: &str, value: &str) -> Result<Entry> {
700        (topic.to_string(), encoded(value)?).try_into()
701    }
702
703    fn data_entry_from_values(topic: &str, values: Vec<String>) -> Result<Entry> {
704        let data = values
705            .into_iter()
706            .map(|value| value.encode())
707            .collect::<Result<Vec<_>>>()?;
708        Ok(Entry::new(Entry::gen_did(topic)?, data, EntryKind::Data))
709    }
710
711    fn overflowing_data_entry(topic: &str, overflow: usize) -> Result<(Entry, usize)> {
712        let incoming_count = ENTRY_DATA_MAX_LEN + overflow;
713        let entry = data_entry_from_values(
714            topic,
715            (0..incoming_count)
716                .map(|i| format!("incoming{i}"))
717                .collect::<Vec<_>>(),
718        )?;
719        Ok((entry, incoming_count))
720    }
721
722    fn decode_entry_data(entry: &Entry) -> Result<Vec<String>> {
723        entry
724            .data
725            .iter()
726            .map(|item| item.decode())
727            .collect::<Result<Vec<String>>>()
728    }
729
730    fn assert_entry_keeps_recent_overflow(
731        entry: &Entry,
732        incoming_count: usize,
733        overflow: usize,
734    ) -> Result<()> {
735        assert_eq!(entry.data.len(), ENTRY_DATA_MAX_LEN);
736        let decoded = decode_entry_data(entry)?;
737        assert_eq!(decoded.first(), Some(&format!("incoming{overflow}")));
738        assert_eq!(
739            decoded.last(),
740            Some(&format!("incoming{}", incoming_count - 1))
741        );
742        Ok(())
743    }
744
745    fn subring_entry(name: &str) -> Result<Entry> {
746        let creator = Entry::gen_did("creator")?;
747        Subring::new(name, creator)?.try_into()
748    }
749
750    fn actor() -> Did {
751        Did::from(42u32)
752    }
753
754    fn version(counter: u32) -> EntryVersion {
755        EntryVersion::new(
756            u128::from(counter),
757            Did::from(counter),
758            Did::from(counter.saturating_add(1000)),
759        )
760    }
761
762    fn data_delta(topic: &str, value: &str, counter: u32) -> Result<Entry> {
763        data_entry(topic, value)?.stamp_delta(version(counter))
764    }
765
766    fn overwrite_delta(topic: &str, value: &str, counter: u32) -> Result<Entry> {
767        data_entry(topic, value)?.stamp_overwrite(version(counter))
768    }
769
770    fn relay_delta(did: Did, value: &str, counter: u32) -> Result<Entry> {
771        Entry::new(did, vec![encoded(value)?], EntryKind::RelayMessage)
772            .stamp_delta(version(counter))
773    }
774
775    #[test]
776    fn gset_satisfies_join_semilattice_laws() {
777        let mut a = GSet::new();
778        a.insert(Did::from(1u32));
779        let mut b = GSet::new();
780        b.insert(Did::from(2u32));
781        let mut ab = GSet::new();
782        ab.insert(Did::from(1u32));
783        ab.insert(Did::from(2u32));
784
785        assert_join_semilattice_laws(&[GSet::new(), a, b, ab]);
786    }
787
788    #[test]
789    fn data_topic_buffer_satisfies_join_semilattice_laws() -> Result<()> {
790        let samples = [
791            Entry::new(Entry::gen_did("topic")?, vec![], EntryKind::Data).topic_buffer()?,
792            data_delta("topic", "a", 1)?.topic_buffer()?,
793            data_delta("topic", "b", 2)?.topic_buffer()?,
794            overwrite_delta("topic", "c", 3)?.topic_buffer()?,
795        ];
796
797        assert_join_semilattice_laws(&samples);
798        Ok(())
799    }
800
801    #[test]
802    fn relay_message_set_satisfies_join_semilattice_laws() -> Result<()> {
803        let did = Did::from(10u32);
804        let a = Entry::new(did, vec![encoded("a")?], EntryKind::RelayMessage)
805            .stamp_delta(version(1))?
806            .relay_set()?;
807        let b = Entry::new(did, vec![encoded("b")?], EntryKind::RelayMessage)
808            .stamp_delta(version(2))?
809            .relay_set()?;
810        let ab = Entry::new(did, vec![], EntryKind::RelayMessage)
811            .join(relay_delta(did, "a", 1)?)?
812            .join(relay_delta(did, "b", 2)?)?;
813        let tombstoned_a = ab.tombstone(relay_delta(did, "a", 1)?)?.relay_set()?;
814
815        assert_join_semilattice_laws(&[RelayMessageSet::default(), a, b, tombstoned_a]);
816        Ok(())
817    }
818
819    #[test]
820    fn entry_join_is_strongly_eventually_consistent_for_data_deltas() -> Result<()> {
821        let base = Entry::new(Entry::gen_did("topic")?, vec![], EntryKind::Data);
822        let deltas = vec![
823            data_delta("topic", "a", 1)?,
824            data_delta("topic", "b", 2)?,
825            data_delta("topic", "a", 3)?,
826        ];
827
828        let forward = deltas
829            .iter()
830            .cloned()
831            .try_fold(base.clone(), |acc, delta| acc.join(delta))?;
832        let reverse = deltas
833            .iter()
834            .rev()
835            .cloned()
836            .try_fold(base.clone(), |acc, delta| acc.join(delta))?;
837        let duplicated = deltas
838            .iter()
839            .cloned()
840            .chain(deltas.iter().cloned())
841            .try_fold(base, |acc, delta| acc.join(delta))?;
842
843        assert_eq!(forward, reverse);
844        assert_eq!(forward, duplicated);
845        assert_eq!(decode_entry_data(&forward)?, vec![
846            String::from("b"),
847            String::from("a")
848        ]);
849        Ok(())
850    }
851
852    #[test]
853    fn generic_sec_witness_accepts_data_topic_buffer_deltas() -> Result<()> {
854        let base = Entry::new(Entry::gen_did("topic")?, vec![], EntryKind::Data).topic_buffer()?;
855        let deltas = vec![
856            data_delta("topic", "a", 1)?.topic_buffer()?,
857            data_delta("topic", "b", 2)?.topic_buffer()?,
858        ];
859
860        assert_strong_eventual_consistency(base, &deltas);
861        Ok(())
862    }
863
864    #[test]
865    fn storage_normalization_uses_lattice_top_n_order() -> Result<()> {
866        let incoming_count = ENTRY_DATA_MAX_LEN + 3;
867        let mut entry = data_entry_from_values(
868            "topic",
869            (0..incoming_count)
870                .map(|i| format!("incoming{i}"))
871                .collect::<Vec<_>>(),
872        )?;
873        entry.crdt.dots = entry
874            .data
875            .iter()
876            .enumerate()
877            .map(|(index, _)| {
878                let counter = if index == 0 {
879                    10_000
880                } else {
881                    u32::try_from(index).map_err(|_| Error::EntryDotIndexOutOfBounds { index })?
882                };
883                EntryDot::for_index(version(counter), index)
884            })
885            .collect::<Result<Vec<_>>>()?;
886
887        let normalized = entry.try_into_storage_entry()?;
888        let decoded = decode_entry_data(&normalized)?;
889
890        assert_eq!(normalized.data.len(), ENTRY_DATA_MAX_LEN);
891        assert_eq!(normalized.data.len(), normalized.crdt.dots.len());
892        assert!(decoded.contains(&String::from("incoming0")));
893        assert!(!decoded.contains(&String::from("incoming1")));
894        assert!(!decoded.contains(&String::from("incoming2")));
895        assert!(!decoded.contains(&String::from("incoming3")));
896        Ok(())
897    }
898
899    #[test]
900    fn storage_normalization_realigns_legacy_mismatched_dots() -> Result<()> {
901        let mut entry = data_entry_from_values(
902            "topic",
903            (0..ENTRY_DATA_MAX_LEN + 2)
904                .map(|i| format!("legacy{i}"))
905                .collect::<Vec<_>>(),
906        )?;
907        entry.crdt.dots = vec![EntryDot::for_index(version(10_000), 0)?];
908
909        let normalized = entry.try_into_storage_entry()?;
910
911        assert_eq!(normalized.data.len(), ENTRY_DATA_MAX_LEN);
912        assert_eq!(normalized.data.len(), normalized.crdt.dots.len());
913        Ok(())
914    }
915
916    #[test]
917    fn crdt_constructors_normalize_carrier_invariants() -> Result<()> {
918        let register = version(10);
919        let stale = encoded("stale")?;
920        let live = encoded("live")?;
921        let mut values = BTreeMap::new();
922        values.insert(stale.clone(), EntryDot::for_index(version(1), 0)?);
923        let live_dot = EntryDot::for_index(version(11), 0)?;
924        values.insert(live.clone(), live_dot);
925
926        let buffer = DataTopicBuffer::new(Some(register), values);
927        assert_eq!(buffer.values.len(), 1);
928        assert!(buffer.values.contains_key(&live));
929
930        let relay = RelayMessageSet::new(buffer, BTreeSet::from([live_dot]));
931        assert!(relay.adds.values.is_empty());
932        assert!(relay.removes.contains(&live_dot));
933        Ok(())
934    }
935
936    #[test]
937    fn overwrite_register_tiebreaker_converges_for_same_timestamp_actor() -> Result<()> {
938        let did = Entry::gen_did("topic")?;
939        let issuer = actor();
940        let lower = Entry::new(did, vec![encoded("lower")?], EntryKind::Data)
941            .stamp_overwrite(EntryVersion::new(1, issuer, Did::from(1u32)))?;
942        let higher = Entry::new(did, vec![encoded("higher")?], EntryKind::Data)
943            .stamp_overwrite(EntryVersion::new(1, issuer, Did::from(2u32)))?;
944        let base = Entry::new(did, vec![], EntryKind::Data);
945
946        let forward = base.clone().join(lower.clone())?.join(higher.clone())?;
947        let reverse = base.join(higher)?.join(lower)?;
948
949        assert_eq!(forward, reverse);
950        assert_eq!(decode_entry_data(&forward)?, vec![String::from("higher")]);
951        Ok(())
952    }
953
954    #[test]
955    fn operation_digest_hashes_canonical_bytes_not_legacy_base58() -> Result<()> {
956        let entry = data_entry("topic", "value")?;
957        let digest = OperationDigest {
958            kind: entry.kind,
959            did: entry.did,
960            data: &entry.data,
961        };
962        let bytes = bincode::serialize(&digest).map_err(Error::BincodeSerialize)?;
963
964        let direct = Did::try_from(HashStr::from_bytes(&bytes))?;
965        let legacy_encoded = bytes.encode()?;
966        let legacy_base58 = Entry::gen_did(legacy_encoded.value())?;
967
968        assert_eq!(entry.operation_digest()?, direct);
969        assert_ne!(direct, legacy_base58);
970        Ok(())
971    }
972
973    #[test]
974    fn forwarded_overwrite_witness_is_not_reissued_after_local_floor() -> Result<()> {
975        let current = overwrite_delta("topic", "current", 10)?;
976        let stale_forwarded = overwrite_delta("topic", "stale", 1)?;
977
978        let updated = current.overwrite(stale_forwarded, actor())?;
979
980        assert_eq!(decode_entry_data(&updated)?, vec![String::from("current")]);
981        Ok(())
982    }
983
984    #[test]
985    fn overwrite_replaces_data_for_same_data_entry() -> Result<()> {
986        let entry = data_entry("topic", "old")?;
987        let other = data_entry("topic", "new")?;
988        let updated = entry.overwrite(other, actor())?;
989        assert_eq!(decode_entry_data(&updated)?, vec![String::from("new")]);
990        Ok(())
991    }
992
993    #[test]
994    fn overwrite_rejects_non_data_entry() -> Result<()> {
995        let entry = subring_entry("ring")?;
996        let other = entry.clone();
997
998        assert!(matches!(
999            entry.overwrite(other, actor()),
1000            Err(Error::EntryNotOverwritable)
1001        ));
1002        Ok(())
1003    }
1004
1005    #[test]
1006    fn overwrite_rejects_kind_mismatch() -> Result<()> {
1007        let entry = data_entry("topic", "old")?;
1008        let mut other = entry.clone();
1009        other.kind = EntryKind::RelayMessage;
1010
1011        assert!(matches!(
1012            entry.overwrite(other, actor()),
1013            Err(Error::EntryKindNotEqual)
1014        ));
1015        Ok(())
1016    }
1017
1018    #[test]
1019    fn overwrite_rejects_key_mismatch() -> Result<()> {
1020        let entry = data_entry("topic-a", "old")?;
1021        let other = data_entry("topic-b", "new")?;
1022
1023        assert!(matches!(
1024            entry.overwrite(other, actor()),
1025            Err(Error::EntryDidNotEqual)
1026        ));
1027        Ok(())
1028    }
1029
1030    #[test]
1031    fn overwrite_caps_payloads_larger_than_max_len() -> Result<()> {
1032        let overflow = 3;
1033        let (incoming, incoming_count) = overflowing_data_entry("topic", overflow)?;
1034        let entry = data_entry("topic", "base")?;
1035        let updated = entry.overwrite(incoming, actor())?;
1036        assert_entry_keeps_recent_overflow(&updated, incoming_count, overflow)
1037    }
1038
1039    #[test]
1040    fn extend_appends_data_for_same_entry() -> Result<()> {
1041        let entry = data_entry("topic", "first")?;
1042        let other = data_entry("topic", "second")?;
1043        let updated = entry.extend(other, actor())?;
1044        assert_eq!(decode_entry_data(&updated)?, vec![
1045            String::from("first"),
1046            String::from("second")
1047        ]);
1048        Ok(())
1049    }
1050
1051    #[test]
1052    fn extend_trims_oldest_items_at_max_len() -> Result<()> {
1053        let mut entry = data_entry("topic", "test0")?;
1054        for i in 1..ENTRY_DATA_MAX_LEN {
1055            let data = format!("test{i}");
1056            let other = data_entry("topic", &data)?;
1057            entry = entry.extend(other, actor())?;
1058            assert_eq!(entry.data.len(), i + 1);
1059        }
1060
1061        for i in ENTRY_DATA_MAX_LEN..ENTRY_DATA_MAX_LEN + 10 {
1062            let data = format!("test{i}");
1063            let other = data_entry("topic", &data)?;
1064            entry = entry.extend(other, actor())?;
1065            assert_eq!(entry.data.len(), ENTRY_DATA_MAX_LEN);
1066            let decoded = decode_entry_data(&entry)?;
1067            assert_eq!(
1068                decoded.first(),
1069                Some(&format!("test{}", i - ENTRY_DATA_MAX_LEN + 1))
1070            );
1071            assert_eq!(decoded.last(), Some(&data));
1072        }
1073        Ok(())
1074    }
1075
1076    #[test]
1077    fn extend_caps_incoming_payloads_larger_than_max_len() -> Result<()> {
1078        let overflow = 3;
1079        let (incoming, incoming_count) = overflowing_data_entry("topic", overflow)?;
1080        let entry = data_entry("topic", "base")?;
1081        let updated = entry.extend(incoming, actor())?;
1082        assert_entry_keeps_recent_overflow(&updated, incoming_count, overflow)
1083    }
1084
1085    #[test]
1086    fn extend_rejects_non_data_entry() -> Result<()> {
1087        let entry = subring_entry("ring")?;
1088        let other = entry.clone();
1089
1090        assert!(matches!(
1091            entry.extend(other, actor()),
1092            Err(Error::EntryNotAppendable)
1093        ));
1094        Ok(())
1095    }
1096
1097    #[test]
1098    fn touch_moves_existing_items_to_end_once() -> Result<()> {
1099        let entry = data_entry("topic", "a")?
1100            .extend(data_entry("topic", "b")?, actor())?
1101            .extend(data_entry("topic", "c")?, actor())?;
1102        let touched = data_entry("topic", "b")?;
1103        let updated = entry.touch(touched, actor())?;
1104        assert_eq!(decode_entry_data(&updated)?, vec![
1105            String::from("a"),
1106            String::from("c"),
1107            String::from("b")
1108        ]);
1109        Ok(())
1110    }
1111
1112    #[test]
1113    fn touch_trims_oldest_non_touched_items_at_max_len() -> Result<()> {
1114        let mut entry = data_entry("topic", "test0")?;
1115        for i in 1..ENTRY_DATA_MAX_LEN {
1116            entry = entry.extend(data_entry("topic", &format!("test{i}"))?, actor())?;
1117        }
1118        let updated = entry.touch(data_entry("topic", "test0")?, actor())?;
1119        assert_eq!(updated.data.len(), ENTRY_DATA_MAX_LEN);
1120        let decoded = decode_entry_data(&updated)?;
1121        assert_eq!(decoded.first(), Some(&String::from("test1")));
1122        assert_eq!(decoded.last(), Some(&String::from("test0")));
1123        Ok(())
1124    }
1125
1126    #[test]
1127    fn relay_tombstone_removes_observed_message_by_join() -> Result<()> {
1128        let did = Did::from(30u32);
1129        let first = relay_delta(did, "first", 1)?;
1130        let second = relay_delta(did, "second", 2)?;
1131        let carrier = Entry::new(did, vec![], EntryKind::RelayMessage)
1132            .join(first.clone())?
1133            .join(second.clone())?;
1134
1135        let removed = carrier.tombstone(first.clone())?;
1136
1137        assert_eq!(decode_entry_data(&removed)?, vec![String::from("second")]);
1138        let joined_with_stale_add = removed.join(first)?;
1139        assert_eq!(decode_entry_data(&joined_with_stale_add)?, vec![
1140            String::from("second")
1141        ]);
1142        Ok(())
1143    }
1144
1145    #[test]
1146    fn relay_tombstone_rejects_non_relay_entry() -> Result<()> {
1147        let entry = data_entry("topic", "value")?;
1148        let other = entry.clone();
1149
1150        assert!(matches!(
1151            entry.tombstone(other),
1152            Err(Error::EntryNotTombstonable)
1153        ));
1154        Ok(())
1155    }
1156
1157    #[test]
1158    fn touch_caps_incoming_payloads_larger_than_max_len() -> Result<()> {
1159        let overflow = 3;
1160        let (incoming, incoming_count) = overflowing_data_entry("topic", overflow)?;
1161        let entry = data_entry("topic", "base")?;
1162        let updated = entry.touch(incoming, actor())?;
1163        assert_entry_keeps_recent_overflow(&updated, incoming_count, overflow)
1164    }
1165
1166    #[test]
1167    fn join_subring_adds_member_to_subring_entry() -> Result<()> {
1168        let entry = subring_entry("ring")?;
1169        let member = Entry::gen_did("member")?;
1170        let updated = entry.join_subring(member)?;
1171        let subring = Subring::try_from(updated)?;
1172        assert_eq!(subring.finger.first(), Some(member));
1173        Ok(())
1174    }
1175
1176    #[test]
1177    fn join_subring_rejects_non_subring_entry() -> Result<()> {
1178        let entry = data_entry("topic", "value")?;
1179        let member = Entry::gen_did("member")?;
1180
1181        assert!(matches!(
1182            entry.join_subring(member),
1183            Err(Error::EntryNotJoinable)
1184        ));
1185        Ok(())
1186    }
1187
1188    #[test]
1189    fn operation_default_entry_matches_operation_kind() -> Result<()> {
1190        let target = data_entry("topic", "value")?;
1191        let default = EntryOperation::Extend(target.clone()).gen_default_entry()?;
1192        assert_eq!(default.did, target.did);
1193        assert_eq!(default.kind, EntryKind::Data);
1194        assert!(default.data.is_empty());
1195        Ok(())
1196    }
1197
1198    #[test]
1199    fn message_payload_entry_key_targets_successor_of_signer() -> Result<()> {
1200        let key = SecretKey::random();
1201        let session = SessionSk::new_with_seckey(&key)?;
1202        let signer: Did = key.address().into();
1203        let payload =
1204            MessagePayload::new_send(Message::custom(b"relay")?, &session, signer, signer)?;
1205        let entry = Entry::try_from(payload)?;
1206        let expected = BigUint::from(signer) + BigUint::from(1u16);
1207        assert_eq!(entry.did, expected.into());
1208        assert_eq!(entry.kind, EntryKind::RelayMessage);
1209        Ok(())
1210    }
1211
1212    #[test]
1213    fn affine_preserves_payload_and_kind_while_rotating_keys() -> Result<()> {
1214        let entry = data_entry("topic", "value")?;
1215        let affined = entry.affine(3)?;
1216        assert_eq!(affined.len(), 3);
1217        for rotated in affined {
1218            assert_eq!(rotated.data, entry.data);
1219            assert_eq!(rotated.kind, entry.kind);
1220        }
1221        Ok(())
1222    }
1223}