arcium-core-utils 0.5.0

Arcium core utils
Documentation
//! A helper trait abstracting over sources of different preprocessing data. Types that implement it
//! should be able to provide objects that resolve to either a triple or a singlet as requested,
//! and in the order they are requested. This allows us to start online phases and generate
//! preprocessing data on the fly instead of needing it all upfront.
use futures::future::{BoxFuture, Shared};
#[cfg(any(test, feature = "dev"))]
use futures::FutureExt;
#[cfg(any(test, feature = "dev"))]
use primitives::types::Positive;
use primitives::{
    algebra::{field::FieldExtension, BoxedUint},
    correlated_randomness::{dabits::DaBit, pow::PowPair, singlets::Singlet, triples::Triple},
};

#[cfg(any(test, feature = "dev"))]
use crate::preprocessing::dealer::{global::GlobalDealer, mock::MockDealer, TrustedGenerator};
use crate::{
    errors::AbortError,
    preprocessing::{
        iterator::{
            NextBatch,
            NextDaBit,
            NextElement,
            NextPowPair,
            NextSinglet,
            NextSingletBatch,
            NextTriple,
            NextTripleBatch,
        },
        Preprocessing,
    },
};

/// Retrieves preprocessing elements for the local peer upon request.
pub trait PreprocessingSource<P: Preprocessing + 'static>: 'static + Send {
    /// Dispatches the request **synchronously** and returns a lazy owned [`NextBatch`].
    ///
    /// `&mut self` is released immediately — callers may issue several batch requests
    /// before driving any of them, enabling concurrent overlap between generation and
    /// consumption.
    fn request_n_elements_batch(
        &mut self,
        n_elements: usize,
        associated_data: <P as Preprocessing>::AssociatedData,
    ) -> NextBatch<P>;

    /// Decomposes the batch into a vector of per-element lazy futures.
    ///
    /// The default implementation drives the batch into an
    /// `Arc<Mutex<Vec<Option<P>>>>` shared across all per-element futures.
    /// Each future **takes** its slot by index exactly once — no cloning of `P`.
    /// Cloning the shared handle is a cheap `Arc` pointer copy; the mutex
    /// serialises concurrent access if futures are polled from different threads.
    fn request_n_elements(
        &mut self,
        n_elements: usize,
        associated_data: <P as Preprocessing>::AssociatedData,
    ) -> Vec<NextElement<P>> {
        use std::sync::{Arc, Mutex};

        use futures::FutureExt;

        // Dispatch synchronously — &mut self is free after this line.
        let batch = self.request_n_elements_batch(n_elements, associated_data);

        // Wrap the resolved Vec in Arc<Mutex<Vec<Option<P>>>> so each per-element
        // future can take its slot exactly once without cloning P.
        #[allow(clippy::type_complexity)]
        let slots: Shared<
            BoxFuture<'static, Result<Arc<Mutex<Vec<Option<P>>>>, AbortError>>,
        > = async move {
            let vec = batch.await?;
            Ok(Arc::new(Mutex::new(
                vec.into_iter().map(Some).collect::<Vec<_>>(),
            )))
        }
        .boxed()
        .shared();

        let futures = (0..n_elements)
            .map(|i| {
                let s = slots.clone();
                Box::pin(async move {
                    s.await?
                        .lock()
                        .map_err(|_| {
                            AbortError::internal_error("preprocessing slots mutex poisoned")
                        })?
                        .get_mut(i)
                        .and_then(Option::take)
                        .ok_or_else(|| {
                            AbortError::internal_error(&format!(
                                "Preprocessing batch too small: index {i} out of bounds"
                            ))
                        })
                }) as NextElement<P>
            })
            .collect::<Vec<_>>();
        futures
    }
}

/// Retrieves singlets for the local peer upon request.
pub trait SingletSource<F: FieldExtension>: PreprocessingSource<Singlet<F>> {
    /// Per-element variant: yields a vector of individual lazy futures.
    fn request_n_singlets(&mut self, n_singlets: usize) -> Vec<NextSinglet<F>> {
        self.request_n_elements(n_singlets, ())
    }

    /// Synchronously dispatches the request and returns a lazy batch future.
    fn request_n_singlets_batch(&mut self, n_singlets: usize) -> NextSingletBatch<F> {
        self.request_n_elements_batch(n_singlets, ())
    }
}
impl<F: FieldExtension, T: PreprocessingSource<Singlet<F>>> SingletSource<F> for T {}

/// Retrieves triples for the local peer upon request.
pub trait TripleSource<F: FieldExtension>: PreprocessingSource<Triple<F>> {
    /// Per-element variant: yields a vector of individual lazy futures.
    fn request_n_triples(&mut self, n_triples: usize) -> Vec<NextTriple<F>> {
        self.request_n_elements(n_triples, ())
    }

    /// Synchronously dispatches the request and returns a lazy batch future.
    fn request_n_triples_batch(&mut self, n_triples: usize) -> NextTripleBatch<F> {
        self.request_n_elements_batch(n_triples, ())
    }
}
impl<F: FieldExtension, T: PreprocessingSource<Triple<F>>> TripleSource<F> for T {}

/// Retrieves DaBits for the local peer upon request.
pub trait DaBitSource<F: FieldExtension>: PreprocessingSource<DaBit<F>> {
    /// Per-element variant: yields a vector of individual lazy futures.
    fn request_n_dabits(&mut self, n_dabits: usize) -> Vec<NextDaBit<F>> {
        self.request_n_elements(n_dabits, ())
    }

    /// Synchronously dispatches the request and returns a lazy batch future.
    fn request_n_dabits_batch(&mut self, n_dabits: usize) -> NextBatch<DaBit<F>> {
        self.request_n_elements_batch(n_dabits, ())
    }
}

impl<F: FieldExtension, T: PreprocessingSource<DaBit<F>>> DaBitSource<F> for T {}

/// Retrieves PowPairs for the local peer upon request.
pub trait PowPairSource<F: FieldExtension>: PreprocessingSource<PowPair<F>> {
    /// Per-element variant: yields a vector of individual lazy futures.
    fn request_n_pow_pairs(
        &mut self,
        n_pow_pairs: usize,
        exponent: BoxedUint,
    ) -> Vec<NextPowPair<F>> {
        self.request_n_elements(n_pow_pairs, exponent)
    }

    /// Synchronously dispatches the request and returns a lazy batch future.
    fn request_n_pow_pairs_batch(
        &mut self,
        n_pow_pairs: usize,
        exponent: BoxedUint,
    ) -> NextBatch<PowPair<F>> {
        self.request_n_elements_batch(n_pow_pairs, exponent)
    }
}

impl<F: FieldExtension, T: PreprocessingSource<PowPair<F>>> PowPairSource<F> for T {}

#[cfg(any(test, feature = "dev"))]
impl<P: Preprocessing + 'static, M: Positive> PreprocessingSource<P> for MockDealer<M>
where
    GlobalDealer<M>: TrustedGenerator<P>,
{
    fn request_n_elements_batch(
        &mut self,
        n_elements: usize,
        associated_data: <P as Preprocessing>::AssociatedData,
    ) -> NextBatch<P> {
        let local_elements = self.request_n(n_elements, associated_data);
        futures::future::try_join_all(local_elements)
            .boxed()
            .shared()
    }
}