1use std::collections::{BTreeMap, BTreeSet};
4
5use serde::{Deserialize, Serialize};
6
7use crate::ProtocolError;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum DeliveryProfile {
12 BestEffort,
13 Eventual,
14 Receipt,
15 Quorum,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "camelCase")]
20pub struct QuorumPolicy {
21 pub required_receipts: u32,
22}
23
24impl QuorumPolicy {
25 pub fn required_receipts(&self) -> u32 {
26 self.required_receipts.max(1)
27 }
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct DeliveryAnnouncement {
33 pub stream_id: String,
34 pub author_id: String,
35 pub sequence: u64,
36 pub object_path: String,
37 pub delivery_profile: DeliveryProfile,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub quorum: Option<QuorumPolicy>,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub object_digest: Option<String>,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct DeliveryReceipt {
47 pub stream_id: String,
48 pub author_id: String,
49 pub recipient_id: String,
50 pub delivered_through: u64,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct SequenceRange {
56 pub start: u64,
57 pub end: u64,
58}
59
60impl SequenceRange {
61 pub fn contains(&self, sequence: u64) -> bool {
62 self.start <= sequence && sequence <= self.end
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "camelCase")]
68pub struct DeliveryManifest {
69 pub stream_id: String,
70 pub author_id: String,
71 pub ranges: Vec<SequenceRange>,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75#[serde(rename_all = "camelCase")]
76pub struct RepairTarget {
77 pub sequence: u64,
78 pub object_path: String,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct DeliveryState {
83 pub sequence: u64,
84 pub receipt_count: u32,
85 pub required_receipts: u32,
86 pub is_delivered: bool,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct DeliveryTracker {
91 stream_id: String,
92 author_id: String,
93 recipients: BTreeSet<String>,
94 announced_sequences: BTreeSet<u64>,
95 highest_receipt_by_recipient: BTreeMap<String, u64>,
96}
97
98impl DeliveryTracker {
99 pub fn new(
100 stream_id: impl Into<String>,
101 author_id: impl Into<String>,
102 recipients: impl IntoIterator<Item = impl Into<String>>,
103 ) -> Self {
104 Self {
105 stream_id: stream_id.into(),
106 author_id: author_id.into(),
107 recipients: recipients.into_iter().map(Into::into).collect(),
108 announced_sequences: BTreeSet::new(),
109 highest_receipt_by_recipient: BTreeMap::new(),
110 }
111 }
112
113 pub fn observe_announcement(
114 &mut self,
115 announcement: &DeliveryAnnouncement,
116 ) -> Result<(), ProtocolError> {
117 validate_scope(
118 &self.stream_id,
119 &self.author_id,
120 &announcement.stream_id,
121 &announcement.author_id,
122 )?;
123 self.announced_sequences.insert(announcement.sequence);
124 Ok(())
125 }
126
127 pub fn observe_receipt(&mut self, receipt: &DeliveryReceipt) -> Result<(), ProtocolError> {
128 validate_scope(
129 &self.stream_id,
130 &self.author_id,
131 &receipt.stream_id,
132 &receipt.author_id,
133 )?;
134 let entry = self
135 .highest_receipt_by_recipient
136 .entry(receipt.recipient_id.clone())
137 .or_insert(0);
138 *entry = (*entry).max(receipt.delivered_through);
139 Ok(())
140 }
141
142 pub fn state_for(
143 &self,
144 sequence: u64,
145 profile: &DeliveryProfile,
146 quorum: Option<&QuorumPolicy>,
147 ) -> Option<DeliveryState> {
148 if !self.announced_sequences.contains(&sequence) {
149 return None;
150 }
151
152 let receipt_count = self
153 .highest_receipt_by_recipient
154 .values()
155 .filter(|observed| **observed >= sequence)
156 .count() as u32;
157 let required_receipts = required_receipts(profile, quorum, self.recipients.len() as u32);
158
159 Some(DeliveryState {
160 sequence,
161 receipt_count,
162 required_receipts,
163 is_delivered: receipt_count >= required_receipts,
164 })
165 }
166
167 pub fn highest_receipt_for(&self, recipient_id: &str) -> Option<u64> {
168 self.highest_receipt_by_recipient.get(recipient_id).copied()
169 }
170}
171
172pub fn mesh_object_path(
173 namespace: &str,
174 stream_id: &str,
175 author_id: &str,
176 sequence: u64,
177) -> String {
178 format!("{namespace}/stream/{stream_id}/obj/{author_id}/{sequence}")
179}
180
181pub fn encode_announcement(announcement: &DeliveryAnnouncement) -> Result<Vec<u8>, ProtocolError> {
182 serde_json::to_vec(announcement).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
183}
184
185pub fn decode_announcement(bytes: &[u8]) -> Result<DeliveryAnnouncement, ProtocolError> {
186 serde_json::from_slice(bytes).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
187}
188
189pub fn encode_receipt(receipt: &DeliveryReceipt) -> Result<Vec<u8>, ProtocolError> {
190 serde_json::to_vec(receipt).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
191}
192
193pub fn decode_receipt(bytes: &[u8]) -> Result<DeliveryReceipt, ProtocolError> {
194 serde_json::from_slice(bytes).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
195}
196
197pub fn encode_manifest(manifest: &DeliveryManifest) -> Result<Vec<u8>, ProtocolError> {
198 serde_json::to_vec(manifest).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
199}
200
201pub fn decode_manifest(bytes: &[u8]) -> Result<DeliveryManifest, ProtocolError> {
202 serde_json::from_slice(bytes).map_err(|e| ProtocolError::InvalidEncoding(e.to_string()))
203}
204
205pub fn manifest_from_sequences(
206 stream_id: impl Into<String>,
207 author_id: impl Into<String>,
208 sequences: impl IntoIterator<Item = u64>,
209) -> DeliveryManifest {
210 let mut sequences = sequences.into_iter().collect::<Vec<_>>();
211 sequences.sort_unstable();
212 sequences.dedup();
213
214 let mut ranges = Vec::new();
215 let mut iter = sequences.into_iter();
216 if let Some(first) = iter.next() {
217 let mut start = first;
218 let mut end = first;
219 for sequence in iter {
220 if sequence == end + 1 {
221 end = sequence;
222 } else {
223 ranges.push(SequenceRange { start, end });
224 start = sequence;
225 end = sequence;
226 }
227 }
228 ranges.push(SequenceRange { start, end });
229 }
230
231 DeliveryManifest {
232 stream_id: stream_id.into(),
233 author_id: author_id.into(),
234 ranges,
235 }
236}
237
238pub fn diff_manifest(
239 local: &DeliveryManifest,
240 remote: &DeliveryManifest,
241) -> Result<Vec<SequenceRange>, ProtocolError> {
242 validate_scope(
243 &local.stream_id,
244 &local.author_id,
245 &remote.stream_id,
246 &remote.author_id,
247 )?;
248 let local_sequences = expand_ranges(&local.ranges);
249 let remote_sequences = expand_ranges(&remote.ranges);
250 let missing = local_sequences
251 .difference(&remote_sequences)
252 .copied()
253 .collect::<Vec<_>>();
254 Ok(manifest_from_sequences(local.stream_id.clone(), local.author_id.clone(), missing).ranges)
255}
256
257pub fn selective_repair_targets(
258 announcements: &[DeliveryAnnouncement],
259 remote: &DeliveryManifest,
260) -> Result<Vec<RepairTarget>, ProtocolError> {
261 let Some(first) = announcements.first() else {
262 return Ok(Vec::new());
263 };
264 validate_scope(
265 &first.stream_id,
266 &first.author_id,
267 &remote.stream_id,
268 &remote.author_id,
269 )?;
270 if announcements.iter().any(|announcement| {
271 announcement.stream_id != first.stream_id || announcement.author_id != first.author_id
272 }) {
273 return Err(ProtocolError::InvalidEnvelope(
274 "announcements must all share the same stream/author".into(),
275 ));
276 }
277
278 let remote_sequences = expand_ranges(&remote.ranges);
279 let mut targets = announcements
280 .iter()
281 .filter(|announcement| !remote_sequences.contains(&announcement.sequence))
282 .map(|announcement| RepairTarget {
283 sequence: announcement.sequence,
284 object_path: announcement.object_path.clone(),
285 })
286 .collect::<Vec<_>>();
287 targets.sort_by_key(|target| target.sequence);
288 Ok(targets)
289}
290
291fn required_receipts(
292 profile: &DeliveryProfile,
293 quorum: Option<&QuorumPolicy>,
294 recipient_count: u32,
295) -> u32 {
296 match profile {
297 DeliveryProfile::BestEffort => 0,
298 DeliveryProfile::Receipt => 1,
299 DeliveryProfile::Eventual => recipient_count.max(1),
300 DeliveryProfile::Quorum => quorum
301 .map(QuorumPolicy::required_receipts)
302 .unwrap_or_else(|| recipient_count.max(1)),
303 }
304}
305
306fn validate_scope(
307 expected_stream_id: &str,
308 expected_author_id: &str,
309 actual_stream_id: &str,
310 actual_author_id: &str,
311) -> Result<(), ProtocolError> {
312 if expected_stream_id != actual_stream_id || expected_author_id != actual_author_id {
313 return Err(ProtocolError::InvalidEnvelope(
314 "stream_id/author_id mismatch".into(),
315 ));
316 }
317 Ok(())
318}
319
320fn expand_ranges(ranges: &[SequenceRange]) -> BTreeSet<u64> {
321 let mut sequences = BTreeSet::new();
322 for range in ranges {
323 for sequence in range.start..=range.end {
324 sequences.insert(sequence);
325 }
326 }
327 sequences
328}