Skip to main content

auths_core/witness/
collector.rs

1//! Receipt collection from multiple witnesses.
2//!
3//! This module provides the `ReceiptCollector` which coordinates receipt
4//! collection from multiple witnesses in parallel, enforcing threshold
5//! requirements for security.
6//!
7//! # Threshold Semantics
8//!
9//! KERI uses k-of-n witness thresholds:
10//! - n = total number of witnesses
11//! - k = minimum receipts required (threshold)
12//!
13//! For example, with 3 witnesses and threshold 2:
14//! - Need at least 2 receipts to succeed
15//! - Can tolerate 1 witness being unavailable
16//!
17//! # Parallel Collection
18//!
19//! Receipts are collected in parallel for efficiency. The collector
20//! returns as soon as the threshold is met, or waits for all witnesses
21//! if the threshold cannot be met.
22
23use std::sync::Arc;
24
25use auths_verifier::keri::Prefix;
26use tokio::time::{Duration, timeout};
27
28use super::async_provider::AsyncWitnessProvider;
29use super::error::{DuplicityEvidence, WitnessError};
30use super::receipt::Receipt;
31
32/// Error during receipt collection.
33#[derive(Debug, thiserror::Error)]
34pub enum CollectionError {
35    /// Duplicity detected during collection.
36    #[error("duplicity detected: {0}")]
37    Duplicity(DuplicityEvidence),
38
39    /// Threshold not met - insufficient receipts collected.
40    #[error("threshold not met: got {got} receipts, need {required}")]
41    ThresholdNotMet {
42        /// Number of receipts successfully collected
43        got: usize,
44        /// Number of receipts required
45        required: usize,
46        /// Errors from failed witness requests
47        errors: Vec<(String, WitnessError)>,
48    },
49
50    /// All witnesses failed.
51    #[error("all witnesses failed")]
52    AllFailed {
53        /// Errors from each witness
54        errors: Vec<(String, WitnessError)>,
55    },
56
57    /// No witnesses configured.
58    #[error("no witnesses configured")]
59    NoWitnesses,
60}
61
62/// Collects receipts from multiple witnesses.
63///
64/// The collector queries witnesses in parallel and returns when either:
65/// - Threshold receipts have been collected (success)
66/// - Duplicity is detected (error)
67/// - Not enough witnesses respond successfully (error)
68///
69/// # Example
70///
71/// ```rust,ignore
72/// use auths_core::witness::{ReceiptCollector, NoOpAsyncWitness};
73///
74/// let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> = vec![
75///     Arc::new(NoOpAsyncWitness),
76///     Arc::new(NoOpAsyncWitness),
77///     Arc::new(NoOpAsyncWitness),
78/// ];
79///
80/// let collector = ReceiptCollector::new(witnesses, 2, 5000);
81/// let prefix = Prefix::new_unchecked("EPrefix".into());
82///
83/// let receipts = collector.collect(&prefix, b"{}").await?;
84/// assert!(receipts.len() >= 2);
85/// ```
86pub struct ReceiptCollector {
87    /// List of witnesses to query
88    witnesses: Vec<Arc<dyn AsyncWitnessProvider>>,
89    /// Minimum receipts required
90    threshold: usize,
91    /// Timeout per witness in milliseconds
92    timeout_ms: u64,
93}
94
95impl ReceiptCollector {
96    /// Create a new receipt collector.
97    ///
98    /// # Arguments
99    ///
100    /// * `witnesses` - List of witnesses to query
101    /// * `threshold` - Minimum number of receipts required
102    /// * `timeout_ms` - Timeout per witness operation in milliseconds
103    pub fn new(
104        witnesses: Vec<Arc<dyn AsyncWitnessProvider>>,
105        threshold: usize,
106        timeout_ms: u64,
107    ) -> Self {
108        Self {
109            witnesses,
110            threshold,
111            timeout_ms,
112        }
113    }
114
115    /// Create a collector from boxed witnesses.
116    pub fn from_boxed(
117        witnesses: Vec<Box<dyn AsyncWitnessProvider>>,
118        threshold: usize,
119        timeout_ms: u64,
120    ) -> Self {
121        let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> =
122            witnesses.into_iter().map(Arc::from).collect();
123        Self::new(witnesses, threshold, timeout_ms)
124    }
125
126    /// Get the number of witnesses.
127    pub fn witness_count(&self) -> usize {
128        self.witnesses.len()
129    }
130
131    /// Get the threshold.
132    pub fn threshold(&self) -> usize {
133        self.threshold
134    }
135
136    /// Collect receipts from witnesses.
137    ///
138    /// Queries all witnesses in parallel and returns when threshold is met
139    /// or all witnesses have responded.
140    ///
141    /// # Arguments
142    ///
143    /// * `prefix` - The KERI prefix of the identity
144    /// * `event_json` - The canonicalized JSON bytes of the event
145    ///
146    /// # Returns
147    ///
148    /// * `Ok(receipts)` - At least `threshold` receipts collected
149    /// * `Err(CollectionError::Duplicity(_))` - Duplicity detected
150    /// * `Err(CollectionError::ThresholdNotMet { .. })` - Not enough receipts
151    pub async fn collect(
152        &self,
153        prefix: &Prefix,
154        event_json: &[u8],
155    ) -> Result<Vec<Receipt>, CollectionError> {
156        if self.witnesses.is_empty() {
157            return Err(CollectionError::NoWitnesses);
158        }
159
160        // Spawn tasks for all witnesses
161        let mut handles = Vec::with_capacity(self.witnesses.len());
162        let timeout_duration = Duration::from_millis(self.timeout_ms);
163
164        for (idx, witness) in self.witnesses.iter().enumerate() {
165            let witness = Arc::clone(witness);
166            let prefix = prefix.clone();
167            let event_json = event_json.to_vec();
168
169            let handle = tokio::spawn(async move {
170                let result =
171                    timeout(timeout_duration, witness.submit_event(&prefix, &event_json)).await;
172
173                match result {
174                    Ok(Ok(receipt)) => Ok((idx, receipt)),
175                    Ok(Err(e)) => Err((idx, e)),
176                    Err(_) => Err((
177                        idx,
178                        WitnessError::Timeout(timeout_duration.as_millis() as u64),
179                    )),
180                }
181            });
182
183            handles.push(handle);
184        }
185
186        // Collect results
187        let mut receipts = Vec::new();
188        let mut errors: Vec<(String, WitnessError)> = Vec::new();
189
190        for handle in handles {
191            match handle.await {
192                Ok(Ok((_idx, receipt))) => {
193                    // Check for duplicity against existing receipts
194                    if let Some(evidence) = self.check_receipt_consistency(&receipts, &receipt) {
195                        return Err(CollectionError::Duplicity(evidence));
196                    }
197                    receipts.push(receipt);
198
199                    // Early return if threshold met
200                    if receipts.len() >= self.threshold {
201                        // Continue collecting remaining for better coverage,
202                        // but we have enough to succeed
203                    }
204                }
205                Ok(Err((idx, e))) => {
206                    // Check for duplicity error specifically
207                    if let WitnessError::Duplicity(evidence) = e {
208                        return Err(CollectionError::Duplicity(evidence));
209                    }
210                    errors.push((format!("witness_{}", idx), e));
211                }
212                Err(join_err) => {
213                    errors.push((
214                        "unknown".to_string(),
215                        WitnessError::Network(format!("task join error: {}", join_err)),
216                    ));
217                }
218            }
219        }
220
221        // Check if we met the threshold
222        if receipts.len() >= self.threshold {
223            Ok(receipts)
224        } else if receipts.is_empty() && !errors.is_empty() {
225            Err(CollectionError::AllFailed { errors })
226        } else {
227            Err(CollectionError::ThresholdNotMet {
228                got: receipts.len(),
229                required: self.threshold,
230                errors,
231            })
232        }
233    }
234
235    /// Check if a new receipt is consistent with existing receipts.
236    fn check_receipt_consistency(
237        &self,
238        existing: &[Receipt],
239        new: &Receipt,
240    ) -> Option<DuplicityEvidence> {
241        if existing.is_empty() {
242            return None;
243        }
244
245        let expected_said = &existing[0].a;
246        if new.a != *expected_said {
247            Some(DuplicityEvidence {
248                prefix: Prefix::default(),
249                sequence: new.s,
250                event_a_said: expected_said.clone(),
251                event_b_said: new.a.clone(),
252                witness_reports: vec![],
253            })
254        } else {
255            None
256        }
257    }
258}
259
260/// Builder for ReceiptCollector.
261#[derive(Default)]
262pub struct ReceiptCollectorBuilder {
263    witnesses: Vec<Arc<dyn AsyncWitnessProvider>>,
264    threshold: Option<usize>,
265    timeout_ms: Option<u64>,
266}
267
268impl std::fmt::Debug for ReceiptCollectorBuilder {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        f.debug_struct("ReceiptCollectorBuilder")
271            .field("witnesses_count", &self.witnesses.len())
272            .field("threshold", &self.threshold)
273            .field("timeout_ms", &self.timeout_ms)
274            .finish()
275    }
276}
277
278impl ReceiptCollectorBuilder {
279    /// Create a new builder.
280    pub fn new() -> Self {
281        Self::default()
282    }
283
284    /// Add a witness.
285    pub fn witness(mut self, witness: Arc<dyn AsyncWitnessProvider>) -> Self {
286        self.witnesses.push(witness);
287        self
288    }
289
290    /// Add multiple witnesses.
291    pub fn witnesses(
292        mut self,
293        witnesses: impl IntoIterator<Item = Arc<dyn AsyncWitnessProvider>>,
294    ) -> Self {
295        self.witnesses.extend(witnesses);
296        self
297    }
298
299    /// Set the threshold.
300    pub fn threshold(mut self, threshold: usize) -> Self {
301        self.threshold = Some(threshold);
302        self
303    }
304
305    /// Set the timeout in milliseconds.
306    pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
307        self.timeout_ms = Some(timeout_ms);
308        self
309    }
310
311    /// Build the collector.
312    ///
313    /// Uses default timeout of 5000ms if not specified.
314    /// Threshold defaults to 1 if not specified.
315    pub fn build(self) -> ReceiptCollector {
316        ReceiptCollector {
317            witnesses: self.witnesses,
318            threshold: self.threshold.unwrap_or(1),
319            timeout_ms: self.timeout_ms.unwrap_or(5000),
320        }
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use crate::witness::NoOpAsyncWitness;
328
329    #[tokio::test]
330    async fn collect_from_noop_witnesses() {
331        let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> = vec![
332            Arc::new(NoOpAsyncWitness),
333            Arc::new(NoOpAsyncWitness),
334            Arc::new(NoOpAsyncWitness),
335        ];
336
337        let collector = ReceiptCollector::new(witnesses, 2, 5000);
338        let prefix = Prefix::new_unchecked("EPrefix".into());
339        let result = collector.collect(&prefix, b"{}").await;
340
341        assert!(result.is_ok());
342        let receipts = result.unwrap();
343        assert!(receipts.len() >= 2);
344    }
345
346    #[tokio::test]
347    async fn threshold_1_of_3() {
348        let witnesses: Vec<Arc<dyn AsyncWitnessProvider>> = vec![
349            Arc::new(NoOpAsyncWitness),
350            Arc::new(NoOpAsyncWitness),
351            Arc::new(NoOpAsyncWitness),
352        ];
353
354        let collector = ReceiptCollector::new(witnesses, 1, 5000);
355        let prefix = Prefix::new_unchecked("EPrefix".into());
356        let result = collector.collect(&prefix, b"{}").await;
357
358        assert!(result.is_ok());
359    }
360
361    #[tokio::test]
362    async fn no_witnesses_error() {
363        let collector = ReceiptCollector::new(vec![], 1, 5000);
364        let prefix = Prefix::new_unchecked("EPrefix".into());
365        let result = collector.collect(&prefix, b"{}").await;
366
367        assert!(matches!(result, Err(CollectionError::NoWitnesses)));
368    }
369
370    #[tokio::test]
371    async fn builder_pattern() {
372        let collector = ReceiptCollectorBuilder::new()
373            .witness(Arc::new(NoOpAsyncWitness))
374            .witness(Arc::new(NoOpAsyncWitness))
375            .threshold(2)
376            .timeout_ms(1000)
377            .build();
378
379        assert_eq!(collector.witness_count(), 2);
380        assert_eq!(collector.threshold(), 2);
381    }
382
383    #[test]
384    fn collection_error_display() {
385        let err = CollectionError::ThresholdNotMet {
386            got: 1,
387            required: 2,
388            errors: vec![],
389        };
390        assert!(format!("{}", err).contains("threshold not met"));
391
392        let err = CollectionError::NoWitnesses;
393        assert!(format!("{}", err).contains("no witnesses"));
394    }
395}