Skip to main content

protocol_core/
delivery.rs

1//! Transport-agnostic delivery receipts, manifests, and selective repair planning.
2
3use std::collections::{BTreeMap, BTreeSet};
4
5use serde::{Deserialize, Serialize};
6
7use crate::ProtocolError;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum DeliveryProfile {
12    BestEffort,
13    Eventual,
14    Receipt,
15    Quorum,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "camelCase")]
20pub struct QuorumPolicy {
21    pub required_receipts: u32,
22}
23
24impl QuorumPolicy {
25    pub fn required_receipts(&self) -> u32 {
26        self.required_receipts.max(1)
27    }
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct DeliveryAnnouncement {
33    pub stream_id: String,
34    pub author_id: String,
35    pub sequence: u64,
36    pub object_path: String,
37    pub delivery_profile: DeliveryProfile,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub quorum: Option<QuorumPolicy>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub object_digest: Option<String>,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct DeliveryReceipt {
47    pub stream_id: String,
48    pub author_id: String,
49    pub recipient_id: String,
50    pub delivered_through: u64,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct SequenceRange {
56    pub start: u64,
57    pub end: u64,
58}
59
60impl SequenceRange {
61    pub fn contains(&self, sequence: u64) -> bool {
62        self.start <= sequence && sequence <= self.end
63    }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "camelCase")]
68pub struct DeliveryManifest {
69    pub stream_id: String,
70    pub author_id: String,
71    pub ranges: Vec<SequenceRange>,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75#[serde(rename_all = "camelCase")]
76pub struct RepairTarget {
77    pub sequence: u64,
78    pub object_path: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct DeliveryState {
83    pub sequence: u64,
84    pub receipt_count: u32,
85    pub required_receipts: u32,
86    pub is_delivered: bool,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct DeliveryTracker {
91    stream_id: String,
92    author_id: String,
93    recipients: BTreeSet<String>,
94    announced_sequences: BTreeSet<u64>,
95    highest_receipt_by_recipient: BTreeMap<String, u64>,
96}
97
98impl DeliveryTracker {
99    pub fn new(
100        stream_id: impl Into<String>,
101        author_id: impl Into<String>,
102        recipients: impl IntoIterator<Item = impl Into<String>>,
103    ) -> Self {
104        Self {
105            stream_id: stream_id.into(),
106            author_id: author_id.into(),
107            recipients: recipients.into_iter().map(Into::into).collect(),
108            announced_sequences: BTreeSet::new(),
109            highest_receipt_by_recipient: BTreeMap::new(),
110        }
111    }
112
113    pub fn observe_announcement(
114        &mut self,
115        announcement: &DeliveryAnnouncement,
116    ) -> Result<(), ProtocolError> {
117        validate_scope(
118            &self.stream_id,
119            &self.author_id,
120            &announcement.stream_id,
121            &announcement.author_id,
122        )?;
123        self.announced_sequences.insert(announcement.sequence);
124        Ok(())
125    }
126
127    pub fn observe_receipt(&mut self, receipt: &DeliveryReceipt) -> Result<(), ProtocolError> {
128        validate_scope(
129            &self.stream_id,
130            &self.author_id,
131            &receipt.stream_id,
132            &receipt.author_id,
133        )?;
134        let entry = self
135            .highest_receipt_by_recipient
136            .entry(receipt.recipient_id.clone())
137            .or_insert(0);
138        *entry = (*entry).max(receipt.delivered_through);
139        Ok(())
140    }
141
142    pub fn state_for(
143        &self,
144        sequence: u64,
145        profile: &DeliveryProfile,
146        quorum: Option<&QuorumPolicy>,
147    ) -> Option<DeliveryState> {
148        if !self.announced_sequences.contains(&sequence) {
149            return None;
150        }
151
152        let receipt_count = self
153            .highest_receipt_by_recipient
154            .values()
155            .filter(|observed| **observed >= sequence)
156            .count() as u32;
157        let required_receipts = required_receipts(profile, quorum, self.recipients.len() as u32);
158
159        Some(DeliveryState {
160            sequence,
161            receipt_count,
162            required_receipts,
163            is_delivered: receipt_count >= required_receipts,
164        })
165    }
166
167    pub fn highest_receipt_for(&self, recipient_id: &str) -> Option<u64> {
168        self.highest_receipt_by_recipient.get(recipient_id).copied()
169    }
170}
171
172pub fn mesh_object_path(
173    namespace: &str,
174    stream_id: &str,
175    author_id: &str,
176    sequence: u64,
177) -> String {
178    format!("{namespace}/stream/{stream_id}/obj/{author_id}/{sequence}")
179}
180
181pub fn encode_announcement(announcement: &DeliveryAnnouncement) -> Result<Vec<u8>, ProtocolError> {
182    serde_json::to_vec(announcement).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
183}
184
185pub fn decode_announcement(bytes: &[u8]) -> Result<DeliveryAnnouncement, ProtocolError> {
186    serde_json::from_slice(bytes).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
187}
188
189pub fn encode_receipt(receipt: &DeliveryReceipt) -> Result<Vec<u8>, ProtocolError> {
190    serde_json::to_vec(receipt).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
191}
192
193pub fn decode_receipt(bytes: &[u8]) -> Result<DeliveryReceipt, ProtocolError> {
194    serde_json::from_slice(bytes).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
195}
196
197pub fn encode_manifest(manifest: &DeliveryManifest) -> Result<Vec<u8>, ProtocolError> {
198    serde_json::to_vec(manifest).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
199}
200
201pub fn decode_manifest(bytes: &[u8]) -> Result<DeliveryManifest, ProtocolError> {
202    serde_json::from_slice(bytes).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
203}
204
205pub fn manifest_from_sequences(
206    stream_id: impl Into<String>,
207    author_id: impl Into<String>,
208    sequences: impl IntoIterator<Item = u64>,
209) -> DeliveryManifest {
210    let mut sequences = sequences.into_iter().collect::<Vec<_>>();
211    sequences.sort_unstable();
212    sequences.dedup();
213
214    let mut ranges = Vec::new();
215    let mut iter = sequences.into_iter();
216    if let Some(first) = iter.next() {
217        let mut start = first;
218        let mut end = first;
219        for sequence in iter {
220            if sequence == end + 1 {
221                end = sequence;
222            } else {
223                ranges.push(SequenceRange { start, end });
224                start = sequence;
225                end = sequence;
226            }
227        }
228        ranges.push(SequenceRange { start, end });
229    }
230
231    DeliveryManifest {
232        stream_id: stream_id.into(),
233        author_id: author_id.into(),
234        ranges,
235    }
236}
237
238pub fn diff_manifest(
239    local: &DeliveryManifest,
240    remote: &DeliveryManifest,
241) -> Result<Vec<SequenceRange>, ProtocolError> {
242    validate_scope(
243        &local.stream_id,
244        &local.author_id,
245        &remote.stream_id,
246        &remote.author_id,
247    )?;
248    let local_sequences = expand_ranges(&local.ranges);
249    let remote_sequences = expand_ranges(&remote.ranges);
250    let missing = local_sequences
251        .difference(&remote_sequences)
252        .copied()
253        .collect::<Vec<_>>();
254    Ok(manifest_from_sequences(local.stream_id.clone(), local.author_id.clone(), missing).ranges)
255}
256
257pub fn selective_repair_targets(
258    announcements: &[DeliveryAnnouncement],
259    remote: &DeliveryManifest,
260) -> Result<Vec<RepairTarget>, ProtocolError> {
261    let Some(first) = announcements.first() else {
262        return Ok(Vec::new());
263    };
264    validate_scope(
265        &first.stream_id,
266        &first.author_id,
267        &remote.stream_id,
268        &remote.author_id,
269    )?;
270    if announcements.iter().any(|announcement| {
271        announcement.stream_id != first.stream_id || announcement.author_id != first.author_id
272    }) {
273        return Err(ProtocolError::InvalidEnvelope(
274            "announcements must all share the same stream/author".into(),
275        ));
276    }
277
278    let remote_sequences = expand_ranges(&remote.ranges);
279    let mut targets = announcements
280        .iter()
281        .filter(|announcement| !remote_sequences.contains(&announcement.sequence))
282        .map(|announcement| RepairTarget {
283            sequence: announcement.sequence,
284            object_path: announcement.object_path.clone(),
285        })
286        .collect::<Vec<_>>();
287    targets.sort_by_key(|target| target.sequence);
288    Ok(targets)
289}
290
291fn required_receipts(
292    profile: &DeliveryProfile,
293    quorum: Option<&QuorumPolicy>,
294    recipient_count: u32,
295) -> u32 {
296    match profile {
297        DeliveryProfile::BestEffort => 0,
298        DeliveryProfile::Receipt => 1,
299        DeliveryProfile::Eventual => recipient_count.max(1),
300        DeliveryProfile::Quorum => quorum
301            .map(QuorumPolicy::required_receipts)
302            .unwrap_or_else(|| recipient_count.max(1)),
303    }
304}
305
306fn validate_scope(
307    expected_stream_id: &str,
308    expected_author_id: &str,
309    actual_stream_id: &str,
310    actual_author_id: &str,
311) -> Result<(), ProtocolError> {
312    if expected_stream_id != actual_stream_id || expected_author_id != actual_author_id {
313        return Err(ProtocolError::InvalidEnvelope(
314            "stream_id/author_id mismatch".into(),
315        ));
316    }
317    Ok(())
318}
319
320fn expand_ranges(ranges: &[SequenceRange]) -> BTreeSet<u64> {
321    let mut sequences = BTreeSet::new();
322    for range in ranges {
323        for sequence in range.start..=range.end {
324            sequences.insert(sequence);
325        }
326    }
327    sequences
328}