Skip to main content

core_utils/preprocessing/
source.rs

1//! A helper trait abstracting over sources of different preprocessing data. Types that implement it
2//! should be able to provide objects that resolve to either a triple or a singlet as requested,
3//! and in the order they are requested. This allows us to start online phases and generate
4//! preprocessing data on the fly instead of needing it all upfront.
5use futures::future::{BoxFuture, Shared};
6#[cfg(any(test, feature = "dev"))]
7use futures::FutureExt;
8#[cfg(any(test, feature = "dev"))]
9use primitives::types::Positive;
10use primitives::{
11    algebra::{field::FieldExtension, BoxedUint},
12    correlated_randomness::{dabits::DaBit, pow::PowPair, singlets::Singlet, triples::Triple},
13};
14
15#[cfg(any(test, feature = "dev"))]
16use crate::preprocessing::dealer::{global::GlobalDealer, mock::MockDealer, TrustedGenerator};
17use crate::{
18    errors::AbortError,
19    preprocessing::{
20        iterator::{
21            NextBatch,
22            NextDaBit,
23            NextElement,
24            NextPowPair,
25            NextSinglet,
26            NextSingletBatch,
27            NextTriple,
28            NextTripleBatch,
29        },
30        Preprocessing,
31    },
32};
33
34/// Retrieves preprocessing elements for the local peer upon request.
35pub trait PreprocessingSource<P: Preprocessing + 'static>: 'static + Send {
36    /// Dispatches the request **synchronously** and returns a lazy owned [`NextBatch`].
37    ///
38    /// `&mut self` is released immediately — callers may issue several batch requests
39    /// before driving any of them, enabling concurrent overlap between generation and
40    /// consumption.
41    fn request_n_elements_batch(
42        &mut self,
43        n_elements: usize,
44        associated_data: <P as Preprocessing>::AssociatedData,
45    ) -> NextBatch<P>;
46
47    /// Decomposes the batch into a vector of per-element lazy futures.
48    ///
49    /// The default implementation drives the batch into an
50    /// `Arc<Mutex<Vec<Option<P>>>>` shared across all per-element futures.
51    /// Each future **takes** its slot by index exactly once — no cloning of `P`.
52    /// Cloning the shared handle is a cheap `Arc` pointer copy; the mutex
53    /// serialises concurrent access if futures are polled from different threads.
54    fn request_n_elements(
55        &mut self,
56        n_elements: usize,
57        associated_data: <P as Preprocessing>::AssociatedData,
58    ) -> Vec<NextElement<P>> {
59        use std::sync::{Arc, Mutex};
60
61        use futures::FutureExt;
62
63        // Dispatch synchronously — &mut self is free after this line.
64        let batch = self.request_n_elements_batch(n_elements, associated_data);
65
66        // Wrap the resolved Vec in Arc<Mutex<Vec<Option<P>>>> so each per-element
67        // future can take its slot exactly once without cloning P.
68        #[allow(clippy::type_complexity)]
69        let slots: Shared<
70            BoxFuture<'static, Result<Arc<Mutex<Vec<Option<P>>>>, AbortError>>,
71        > = async move {
72            let vec = batch.await?;
73            Ok(Arc::new(Mutex::new(
74                vec.into_iter().map(Some).collect::<Vec<_>>(),
75            )))
76        }
77        .boxed()
78        .shared();
79
80        let futures = (0..n_elements)
81            .map(|i| {
82                let s = slots.clone();
83                Box::pin(async move {
84                    s.await?
85                        .lock()
86                        .map_err(|_| {
87                            AbortError::internal_error("preprocessing slots mutex poisoned")
88                        })?
89                        .get_mut(i)
90                        .and_then(Option::take)
91                        .ok_or_else(|| {
92                            AbortError::internal_error(&format!(
93                                "Preprocessing batch too small: index {i} out of bounds"
94                            ))
95                        })
96                }) as NextElement<P>
97            })
98            .collect::<Vec<_>>();
99        futures
100    }
101}
102
103/// Retrieves singlets for the local peer upon request.
104pub trait SingletSource<F: FieldExtension>: PreprocessingSource<Singlet<F>> {
105    /// Per-element variant: yields a vector of individual lazy futures.
106    fn request_n_singlets(&mut self, n_singlets: usize) -> Vec<NextSinglet<F>> {
107        self.request_n_elements(n_singlets, ())
108    }
109
110    /// Synchronously dispatches the request and returns a lazy batch future.
111    fn request_n_singlets_batch(&mut self, n_singlets: usize) -> NextSingletBatch<F> {
112        self.request_n_elements_batch(n_singlets, ())
113    }
114}
115impl<F: FieldExtension, T: PreprocessingSource<Singlet<F>>> SingletSource<F> for T {}
116
117/// Retrieves triples for the local peer upon request.
118pub trait TripleSource<F: FieldExtension>: PreprocessingSource<Triple<F>> {
119    /// Per-element variant: yields a vector of individual lazy futures.
120    fn request_n_triples(&mut self, n_triples: usize) -> Vec<NextTriple<F>> {
121        self.request_n_elements(n_triples, ())
122    }
123
124    /// Synchronously dispatches the request and returns a lazy batch future.
125    fn request_n_triples_batch(&mut self, n_triples: usize) -> NextTripleBatch<F> {
126        self.request_n_elements_batch(n_triples, ())
127    }
128}
129impl<F: FieldExtension, T: PreprocessingSource<Triple<F>>> TripleSource<F> for T {}
130
131/// Retrieves DaBits for the local peer upon request.
132pub trait DaBitSource<F: FieldExtension>: PreprocessingSource<DaBit<F>> {
133    /// Per-element variant: yields a vector of individual lazy futures.
134    fn request_n_dabits(&mut self, n_dabits: usize) -> Vec<NextDaBit<F>> {
135        self.request_n_elements(n_dabits, ())
136    }
137
138    /// Synchronously dispatches the request and returns a lazy batch future.
139    fn request_n_dabits_batch(&mut self, n_dabits: usize) -> NextBatch<DaBit<F>> {
140        self.request_n_elements_batch(n_dabits, ())
141    }
142}
143
144impl<F: FieldExtension, T: PreprocessingSource<DaBit<F>>> DaBitSource<F> for T {}
145
146/// Retrieves PowPairs for the local peer upon request.
147pub trait PowPairSource<F: FieldExtension>: PreprocessingSource<PowPair<F>> {
148    /// Per-element variant: yields a vector of individual lazy futures.
149    fn request_n_pow_pairs(
150        &mut self,
151        n_pow_pairs: usize,
152        exponent: BoxedUint,
153    ) -> Vec<NextPowPair<F>> {
154        self.request_n_elements(n_pow_pairs, exponent)
155    }
156
157    /// Synchronously dispatches the request and returns a lazy batch future.
158    fn request_n_pow_pairs_batch(
159        &mut self,
160        n_pow_pairs: usize,
161        exponent: BoxedUint,
162    ) -> NextBatch<PowPair<F>> {
163        self.request_n_elements_batch(n_pow_pairs, exponent)
164    }
165}
166
167impl<F: FieldExtension, T: PreprocessingSource<PowPair<F>>> PowPairSource<F> for T {}
168
169#[cfg(any(test, feature = "dev"))]
170impl<P: Preprocessing + 'static, M: Positive> PreprocessingSource<P> for MockDealer<M>
171where
172    GlobalDealer<M>: TrustedGenerator<P>,
173{
174    fn request_n_elements_batch(
175        &mut self,
176        n_elements: usize,
177        associated_data: <P as Preprocessing>::AssociatedData,
178    ) -> NextBatch<P> {
179        let local_elements = self.request_n(n_elements, associated_data);
180        futures::future::try_join_all(local_elements)
181            .boxed()
182            .shared()
183    }
184}