samaharam 0.2.0

Scalable heterogeneous zero-knowledge proof aggregation for EVM chains
Documentation
//! Proof queue for managing pending proofs.

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

use crate::proof::{Batched, Proof};
use crate::registry::VkId;
use crate::traits::PairingEngine;

/// Configuration for proof queue.
#[derive(Debug, Clone)]
pub struct QueueConfig {
    /// Maximum queue size.
    pub max_size: usize,

    /// Trigger aggregation when queue reaches this size.
    pub batch_threshold: usize,

    /// Maximum wait time before forced aggregation (ms).
    pub max_wait_ms: u64,
}

impl Default for QueueConfig {
    fn default() -> Self {
        Self {
            max_size: 1024,
            batch_threshold: 32,
            max_wait_ms: 5000,
        }
    }
}

/// Thread-safe proof queue with priority ordering.
pub struct ProofQueue<E: PairingEngine> {
    inner: Arc<Mutex<ProofQueueInner<E>>>,
    config: QueueConfig,
}

struct ProofQueueInner<E: PairingEngine> {
    queue: VecDeque<QueuedProof<E>>,
    by_vk: std::collections::HashMap<VkId, Vec<usize>>,
}

/// A proof in the queue with metadata.
#[derive(Debug)]
struct QueuedProof<E: PairingEngine> {
    proof: Proof<E, Batched>,
    #[allow(dead_code)] // Used for priority queue sorting
    priority: u8,
    #[allow(dead_code)] // Used for timeout-based aggregation
    enqueue_time: std::time::Instant,
}

impl<E: PairingEngine> ProofQueue<E> {
    /// Create a new queue with default config.
    pub fn new() -> Self {
        Self::with_config(QueueConfig::default())
    }

    /// Create a new queue with custom config.
    pub fn with_config(config: QueueConfig) -> Self {
        Self {
            inner: Arc::new(Mutex::new(ProofQueueInner {
                queue: VecDeque::new(),
                by_vk: std::collections::HashMap::new(),
            })),
            config,
        }
    }

    /// Enqueue a proof with default priority.
    pub fn enqueue(&self, proof: Proof<E, Batched>) -> Result<(), String> {
        self.enqueue_with_priority(proof, 0)
    }

    /// Enqueue a proof with specific priority (higher = more urgent).
    pub fn enqueue_with_priority(
        &self,
        proof: Proof<E, Batched>,
        priority: u8,
    ) -> Result<(), String> {
        let mut inner = self.inner.lock().unwrap();

        if inner.queue.len() >= self.config.max_size {
            return Err("Queue is full".to_string());
        }

        let vk_id = proof.vk_id();
        let idx = inner.queue.len();

        inner.queue.push_back(QueuedProof {
            proof,
            priority,
            enqueue_time: std::time::Instant::now(),
        });

        inner.by_vk.entry(vk_id).or_default().push(idx);

        Ok(())
    }

    /// Get the current queue size.
    pub fn len(&self) -> usize {
        self.inner.lock().unwrap().queue.len()
    }

    /// Check if queue is empty.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Check if queue has reached batch threshold.
    pub fn should_aggregate(&self) -> bool {
        self.len() >= self.config.batch_threshold
    }

    /// Drain all proofs for aggregation.
    pub fn drain(&self) -> Vec<Proof<E, Batched>> {
        let mut inner = self.inner.lock().unwrap();
        let proofs: Vec<_> = inner.queue.drain(..).map(|qp| qp.proof).collect();
        inner.by_vk.clear();
        proofs
    }

    /// Drain proofs for a specific VK.
    pub fn drain_by_vk(&self, vk_id: VkId) -> Vec<Proof<E, Batched>> {
        let mut inner = self.inner.lock().unwrap();

        if let Some(indices) = inner.by_vk.remove(&vk_id) {
            // This is a simplified implementation - in production
            // we'd need proper index management
            let mut result = Vec::with_capacity(indices.len());
            for idx in indices.into_iter().rev() {
                if idx < inner.queue.len() {
                    if let Some(qp) = inner.queue.remove(idx) {
                        result.push(qp.proof);
                    }
                }
            }
            result
        } else {
            Vec::new()
        }
    }

    /// Get count of proofs per VK.
    pub fn vk_counts(&self) -> std::collections::HashMap<VkId, usize> {
        let inner = self.inner.lock().unwrap();
        inner.by_vk.iter().map(|(k, v)| (*k, v.len())).collect()
    }
}

impl<E: PairingEngine> Default for ProofQueue<E> {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::backend::bn254::Bn254;
    use crate::proof::{Proof, Verified};

    fn mock_batched_proof(vk_id: VkId) -> Proof<Bn254, Batched> {
        Proof::<Bn254, Verified>::new_verified(vec![], vec![], vk_id).submit()
    }

    #[test]
    fn queue_starts_empty() {
        let queue = ProofQueue::<Bn254>::new();
        assert!(queue.is_empty());
        assert_eq!(queue.len(), 0);
    }

    #[test]
    fn queue_enqueues_proofs() {
        let queue = ProofQueue::<Bn254>::new();

        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();

        assert_eq!(queue.len(), 2);
    }

    #[test]
    fn queue_tracks_by_vk() {
        let queue = ProofQueue::<Bn254>::new();

        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        queue.enqueue(mock_batched_proof(VkId::new(2))).unwrap();
        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();

        let counts = queue.vk_counts();
        assert_eq!(counts.get(&VkId::new(1)), Some(&2));
        assert_eq!(counts.get(&VkId::new(2)), Some(&1));
    }

    #[test]
    fn queue_drains_all() {
        let queue = ProofQueue::<Bn254>::new();

        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        queue.enqueue(mock_batched_proof(VkId::new(2))).unwrap();

        let proofs = queue.drain();
        assert_eq!(proofs.len(), 2);
        assert!(queue.is_empty());
    }

    #[test]
    fn queue_respects_max_size() {
        let config = QueueConfig {
            max_size: 2,
            ..Default::default()
        };
        let queue = ProofQueue::<Bn254>::with_config(config);

        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();

        let result = queue.enqueue(mock_batched_proof(VkId::new(1)));
        assert!(result.is_err());
    }

    #[test]
    fn queue_batch_threshold() {
        let config = QueueConfig {
            batch_threshold: 3,
            ..Default::default()
        };
        let queue = ProofQueue::<Bn254>::with_config(config);

        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        assert!(!queue.should_aggregate());

        queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
        assert!(queue.should_aggregate());
    }
}