1use std::sync::Arc;
24
25use auths_verifier::keri::Prefix;
26use tokio::time::{Duration, timeout};
27
28use super::async_provider::AsyncWitnessProvider;
29use super::error::{DuplicityEvidence, WitnessError};
30use super::receipt::Receipt;
31
32#[derive(Debug, thiserror::Error)]
34pub enum CollectionError {
35 #[error("duplicity detected: {0}")]
37 Duplicity(DuplicityEvidence),
38
39 #[error("threshold not met: got {got} receipts, need {required}")]
41 ThresholdNotMet {
42 got: usize,
44 required: usize,
46 errors: Vec<(String, WitnessError)>,
48 },
49
50 #[error("all witnesses failed")]
52 AllFailed {
53 errors: Vec<(String, WitnessError)>,
55 },
56
57 #[error("no witnesses configured")]
59 NoWitnesses,
60}
61
62pub struct ReceiptCollector {
87 witnesses: Vec<Arc<dyn AsyncWitnessProvider>>,
89 threshold: usize,
91 timeout_ms: u64,
93}
94
95impl ReceiptCollector {
96 pub fn new(
104 witnesses: Vec<Arc<dyn AsyncWitnessProvider>>,
105 threshold: usize,
106 timeout_ms: u64,
107 ) -> Self {
108 Self {
109 witnesses,
110 threshold,
111 timeout_ms,
112 }
113 }
114
115 pub fn from_boxed(
117 witnesses: Vec<Box<dyn AsyncWitnessProvider>>,
118 threshold: usize,
119 timeout_ms: u64,
120 ) -> Self {
121 let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> =
122 witnesses.into_iter().map(Arc::from).collect();
123 Self::new(witnesses, threshold, timeout_ms)
124 }
125
126 pub fn witness_count(&self) -> usize {
128 self.witnesses.len()
129 }
130
131 pub fn threshold(&self) -> usize {
133 self.threshold
134 }
135
136 pub async fn collect(
152 &self,
153 prefix: &Prefix,
154 event_json: &[u8],
155 ) -> Result<Vec<Receipt>, CollectionError> {
156 if self.witnesses.is_empty() {
157 return Err(CollectionError::NoWitnesses);
158 }
159
160 let mut handles = Vec::with_capacity(self.witnesses.len());
162 let timeout_duration = Duration::from_millis(self.timeout_ms);
163
164 for (idx, witness) in self.witnesses.iter().enumerate() {
165 let witness = Arc::clone(witness);
166 let prefix = prefix.clone();
167 let event_json = event_json.to_vec();
168
169 let handle = tokio::spawn(async move {
170 let result =
171 timeout(timeout_duration, witness.submit_event(&prefix, &event_json)).await;
172
173 match result {
174 Ok(Ok(receipt)) => Ok((idx, receipt)),
175 Ok(Err(e)) => Err((idx, e)),
176 Err(_) => Err((
177 idx,
178 WitnessError::Timeout(timeout_duration.as_millis() as u64),
179 )),
180 }
181 });
182
183 handles.push(handle);
184 }
185
186 let mut receipts = Vec::new();
188 let mut errors: Vec<(String, WitnessError)> = Vec::new();
189
190 for handle in handles {
191 match handle.await {
192 Ok(Ok((_idx, receipt))) => {
193 if let Some(evidence) = self.check_receipt_consistency(&receipts, &receipt) {
195 return Err(CollectionError::Duplicity(evidence));
196 }
197 receipts.push(receipt);
198
199 if receipts.len() >= self.threshold {
201 }
204 }
205 Ok(Err((idx, e))) => {
206 if let WitnessError::Duplicity(evidence) = e {
208 return Err(CollectionError::Duplicity(evidence));
209 }
210 errors.push((format!("witness_{}", idx), e));
211 }
212 Err(join_err) => {
213 errors.push((
214 "unknown".to_string(),
215 WitnessError::Network(format!("task join error: {}", join_err)),
216 ));
217 }
218 }
219 }
220
221 if receipts.len() >= self.threshold {
223 Ok(receipts)
224 } else if receipts.is_empty() && !errors.is_empty() {
225 Err(CollectionError::AllFailed { errors })
226 } else {
227 Err(CollectionError::ThresholdNotMet {
228 got: receipts.len(),
229 required: self.threshold,
230 errors,
231 })
232 }
233 }
234
235 fn check_receipt_consistency(
237 &self,
238 existing: &[Receipt],
239 new: &Receipt,
240 ) -> Option<DuplicityEvidence> {
241 if existing.is_empty() {
242 return None;
243 }
244
245 let expected_said = &existing[0].a;
246 if new.a != *expected_said {
247 Some(DuplicityEvidence {
248 prefix: Prefix::default(),
249 sequence: new.s,
250 event_a_said: expected_said.clone(),
251 event_b_said: new.a.clone(),
252 witness_reports: vec![],
253 })
254 } else {
255 None
256 }
257 }
258}
259
260#[derive(Default)]
262pub struct ReceiptCollectorBuilder {
263 witnesses: Vec<Arc<dyn AsyncWitnessProvider>>,
264 threshold: Option<usize>,
265 timeout_ms: Option<u64>,
266}
267
268impl std::fmt::Debug for ReceiptCollectorBuilder {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 f.debug_struct("ReceiptCollectorBuilder")
271 .field("witnesses_count", &self.witnesses.len())
272 .field("threshold", &self.threshold)
273 .field("timeout_ms", &self.timeout_ms)
274 .finish()
275 }
276}
277
278impl ReceiptCollectorBuilder {
279 pub fn new() -> Self {
281 Self::default()
282 }
283
284 pub fn witness(mut self, witness: Arc<dyn AsyncWitnessProvider>) -> Self {
286 self.witnesses.push(witness);
287 self
288 }
289
290 pub fn witnesses(
292 mut self,
293 witnesses: impl IntoIterator<Item = Arc<dyn AsyncWitnessProvider>>,
294 ) -> Self {
295 self.witnesses.extend(witnesses);
296 self
297 }
298
299 pub fn threshold(mut self, threshold: usize) -> Self {
301 self.threshold = Some(threshold);
302 self
303 }
304
305 pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
307 self.timeout_ms = Some(timeout_ms);
308 self
309 }
310
311 pub fn build(self) -> ReceiptCollector {
316 ReceiptCollector {
317 witnesses: self.witnesses,
318 threshold: self.threshold.unwrap_or(1),
319 timeout_ms: self.timeout_ms.unwrap_or(5000),
320 }
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use crate::witness::NoOpAsyncWitness;
328
329 #[tokio::test]
330 async fn collect_from_noop_witnesses() {
331 let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> = vec![
332 Arc::new(NoOpAsyncWitness),
333 Arc::new(NoOpAsyncWitness),
334 Arc::new(NoOpAsyncWitness),
335 ];
336
337 let collector = ReceiptCollector::new(witnesses, 2, 5000);
338 let prefix = Prefix::new_unchecked("EPrefix".into());
339 let result = collector.collect(&prefix, b"{}").await;
340
341 assert!(result.is_ok());
342 let receipts = result.unwrap();
343 assert!(receipts.len() >= 2);
344 }
345
346 #[tokio::test]
347 async fn threshold_1_of_3() {
348 let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> = vec![
349 Arc::new(NoOpAsyncWitness),
350 Arc::new(NoOpAsyncWitness),
351 Arc::new(NoOpAsyncWitness),
352 ];
353
354 let collector = ReceiptCollector::new(witnesses, 1, 5000);
355 let prefix = Prefix::new_unchecked("EPrefix".into());
356 let result = collector.collect(&prefix, b"{}").await;
357
358 assert!(result.is_ok());
359 }
360
361 #[tokio::test]
362 async fn no_witnesses_error() {
363 let collector = ReceiptCollector::new(vec![], 1, 5000);
364 let prefix = Prefix::new_unchecked("EPrefix".into());
365 let result = collector.collect(&prefix, b"{}").await;
366
367 assert!(matches!(result, Err(CollectionError::NoWitnesses)));
368 }
369
370 #[tokio::test]
371 async fn builder_pattern() {
372 let collector = ReceiptCollectorBuilder::new()
373 .witness(Arc::new(NoOpAsyncWitness))
374 .witness(Arc::new(NoOpAsyncWitness))
375 .threshold(2)
376 .timeout_ms(1000)
377 .build();
378
379 assert_eq!(collector.witness_count(), 2);
380 assert_eq!(collector.threshold(), 2);
381 }
382
383 #[test]
384 fn collection_error_display() {
385 let err = CollectionError::ThresholdNotMet {
386 got: 1,
387 required: 2,
388 errors: vec![],
389 };
390 assert!(format!("{}", err).contains("threshold not met"));
391
392 let err = CollectionError::NoWitnesses;
393 assert!(format!("{}", err).contains("no witnesses"));
394 }
395}