use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use crate::proof::{Batched, Proof};
use crate::registry::VkId;
use crate::traits::PairingEngine;
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub max_size: usize,
pub batch_threshold: usize,
pub max_wait_ms: u64,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
max_size: 1024,
batch_threshold: 32,
max_wait_ms: 5000,
}
}
}
pub struct ProofQueue<E: PairingEngine> {
inner: Arc<Mutex<ProofQueueInner<E>>>,
config: QueueConfig,
}
struct ProofQueueInner<E: PairingEngine> {
queue: VecDeque<QueuedProof<E>>,
by_vk: std::collections::HashMap<VkId, Vec<usize>>,
}
#[derive(Debug)]
struct QueuedProof<E: PairingEngine> {
proof: Proof<E, Batched>,
#[allow(dead_code)] priority: u8,
#[allow(dead_code)] enqueue_time: std::time::Instant,
}
impl<E: PairingEngine> ProofQueue<E> {
pub fn new() -> Self {
Self::with_config(QueueConfig::default())
}
pub fn with_config(config: QueueConfig) -> Self {
Self {
inner: Arc::new(Mutex::new(ProofQueueInner {
queue: VecDeque::new(),
by_vk: std::collections::HashMap::new(),
})),
config,
}
}
pub fn enqueue(&self, proof: Proof<E, Batched>) -> Result<(), String> {
self.enqueue_with_priority(proof, 0)
}
pub fn enqueue_with_priority(
&self,
proof: Proof<E, Batched>,
priority: u8,
) -> Result<(), String> {
let mut inner = self.inner.lock().unwrap();
if inner.queue.len() >= self.config.max_size {
return Err("Queue is full".to_string());
}
let vk_id = proof.vk_id();
let idx = inner.queue.len();
inner.queue.push_back(QueuedProof {
proof,
priority,
enqueue_time: std::time::Instant::now(),
});
inner.by_vk.entry(vk_id).or_default().push(idx);
Ok(())
}
pub fn len(&self) -> usize {
self.inner.lock().unwrap().queue.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn should_aggregate(&self) -> bool {
self.len() >= self.config.batch_threshold
}
pub fn drain(&self) -> Vec<Proof<E, Batched>> {
let mut inner = self.inner.lock().unwrap();
let proofs: Vec<_> = inner.queue.drain(..).map(|qp| qp.proof).collect();
inner.by_vk.clear();
proofs
}
pub fn drain_by_vk(&self, vk_id: VkId) -> Vec<Proof<E, Batched>> {
let mut inner = self.inner.lock().unwrap();
if let Some(indices) = inner.by_vk.remove(&vk_id) {
let mut result = Vec::with_capacity(indices.len());
for idx in indices.into_iter().rev() {
if idx < inner.queue.len() {
if let Some(qp) = inner.queue.remove(idx) {
result.push(qp.proof);
}
}
}
result
} else {
Vec::new()
}
}
pub fn vk_counts(&self) -> std::collections::HashMap<VkId, usize> {
let inner = self.inner.lock().unwrap();
inner.by_vk.iter().map(|(k, v)| (*k, v.len())).collect()
}
}
impl<E: PairingEngine> Default for ProofQueue<E> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::bn254::Bn254;
use crate::proof::{Proof, Verified};
fn mock_batched_proof(vk_id: VkId) -> Proof<Bn254, Batched> {
Proof::<Bn254, Verified>::new_verified(vec![], vec![], vk_id).submit()
}
#[test]
fn queue_starts_empty() {
let queue = ProofQueue::<Bn254>::new();
assert!(queue.is_empty());
assert_eq!(queue.len(), 0);
}
#[test]
fn queue_enqueues_proofs() {
let queue = ProofQueue::<Bn254>::new();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
assert_eq!(queue.len(), 2);
}
#[test]
fn queue_tracks_by_vk() {
let queue = ProofQueue::<Bn254>::new();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
queue.enqueue(mock_batched_proof(VkId::new(2))).unwrap();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
let counts = queue.vk_counts();
assert_eq!(counts.get(&VkId::new(1)), Some(&2));
assert_eq!(counts.get(&VkId::new(2)), Some(&1));
}
#[test]
fn queue_drains_all() {
let queue = ProofQueue::<Bn254>::new();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
queue.enqueue(mock_batched_proof(VkId::new(2))).unwrap();
let proofs = queue.drain();
assert_eq!(proofs.len(), 2);
assert!(queue.is_empty());
}
#[test]
fn queue_respects_max_size() {
let config = QueueConfig {
max_size: 2,
..Default::default()
};
let queue = ProofQueue::<Bn254>::with_config(config);
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
let result = queue.enqueue(mock_batched_proof(VkId::new(1)));
assert!(result.is_err());
}
#[test]
fn queue_batch_threshold() {
let config = QueueConfig {
batch_threshold: 3,
..Default::default()
};
let queue = ProofQueue::<Bn254>::with_config(config);
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
assert!(!queue.should_aggregate());
queue.enqueue(mock_batched_proof(VkId::new(1))).unwrap();
assert!(queue.should_aggregate());
}
}