arcium-primitives 0.6.0

Arcium primitives
Documentation
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use tokio::sync::oneshot::{Receiver as OneshotReceiver, Sender as OneshotSender};

use crate::correlated_randomness::{stream::CorrelatedStreamError, CorrelatedBatch};

/// A handle for a prefetch operation. Fire-and-forget by default; await for completion
/// confirmation. The prefetch proceeds in the background whether or not this is awaited.
pub struct PrefetchHandle<E> {
    receiver: OneshotReceiver<Result<(), E>>,
}

impl<E> From<OneshotReceiver<Result<(), E>>> for PrefetchHandle<E> {
    fn from(receiver: OneshotReceiver<Result<(), E>>) -> Self {
        Self { receiver }
    }
}

impl<E: From<CorrelatedStreamError>> Future for PrefetchHandle<E> {
    type Output = Result<(), E>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.receiver)
            .poll(cx)
            .map(|r| r.unwrap_or_else(|_| Err(CorrelatedStreamError::StreamClosed.into())))
    }
}

/// Commands sent from `PreprocessingStream` to the dispatcher task.
pub enum Command<PB: CorrelatedBatch, E> {
    /// Request N items as a single batch; the dispatcher assembles the Vec before sending.
    RequestN {
        n_elements: usize,
        completion: OneshotSender<Result<Vec<PB::Item>, E>>,
    },
    /// Proactively generate N items into the buffer.
    Prefetch {
        n_elements: usize,
        completion: OneshotSender<Result<(), E>>,
    },
}