use crate::agents::{AgentPool, VerificationTask};
use crate::crypto::Ed25519Signature;
use crate::error::{Error, Result};
use crate::workflows::{WorkflowContext, WorkflowResult};
use ed25519_dalek::VerifyingKey;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use tokio::time::{timeout, Duration};
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct BatchVerificationRequest {
pub messages: Vec<Vec<u8>>,
pub signatures: Vec<Ed25519Signature>,
pub public_keys: Vec<VerifyingKey>,
}
impl BatchVerificationRequest {
pub fn new() -> Self {
Self {
messages: Vec::new(),
signatures: Vec::new(),
public_keys: Vec::new(),
}
}
pub fn add(&mut self, message: Vec<u8>, signature: Ed25519Signature, public_key: VerifyingKey) {
self.messages.push(message);
self.signatures.push(signature);
self.public_keys.push(public_key);
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
}
impl Default for BatchVerificationRequest {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchVerificationResult {
pub total: usize,
pub successful: usize,
pub failed: usize,
pub failed_indices: Vec<usize>,
pub throughput: f64,
}
pub struct AutonomousBatchWorkflow {
pool: AgentPool,
timeout_ms: u64,
chunk_size: usize,
}
impl AutonomousBatchWorkflow {
pub fn new(pool: AgentPool, timeout_ms: u64, chunk_size: usize) -> Self {
Self {
pool,
timeout_ms,
chunk_size,
}
}
pub async fn execute(
&self,
request: BatchVerificationRequest,
context: WorkflowContext,
) -> Result<WorkflowResult<BatchVerificationResult>> {
let start = Instant::now();
if request.is_empty() {
return Ok(WorkflowResult::success(
context,
BatchVerificationResult {
total: 0,
successful: 0,
failed: 0,
failed_indices: Vec::new(),
throughput: 0.0,
},
0,
));
}
info!(
"Starting batch verification workflow {} with {} items",
context.id,
request.len()
);
if request.messages.len() != request.signatures.len()
|| request.messages.len() != request.public_keys.len()
{
return Err(Error::InvalidInput(
"Batch verification: mismatched array lengths".to_string(),
));
}
let total_items = request.len();
let mut tasks = Vec::new();
for i in (0..total_items).step_by(self.chunk_size) {
let end = (i + self.chunk_size).min(total_items);
let chunk_messages: Vec<_> = request.messages[i..end].to_vec();
let chunk_signatures: Vec<_> = request.signatures[i..end].to_vec();
let chunk_keys: Vec<_> = request.public_keys[i..end].to_vec();
tasks.push((i, chunk_messages, chunk_signatures, chunk_keys));
}
debug!("Split into {} chunks of size {}", tasks.len(), self.chunk_size);
let futures = tasks.into_iter().map(|(offset, messages, sigs, keys)| {
let pool = self.pool.clone();
let timeout_duration = Duration::from_millis(self.timeout_ms);
async move {
Self::process_chunk(pool, messages, sigs, keys, offset, timeout_duration).await
}
});
let chunk_results: Vec<Vec<(usize, bool)>> = join_all(futures)
.await
.into_iter()
.filter_map(Result::ok)
.collect();
let mut failed_indices = Vec::new();
let mut successful = 0;
for chunk_result in chunk_results {
for (idx, is_valid) in chunk_result {
if is_valid {
successful += 1;
} else {
failed_indices.push(idx);
}
}
}
let failed = failed_indices.len();
let execution_time = start.elapsed();
let throughput = total_items as f64 / execution_time.as_secs_f64();
info!(
"Batch verification completed: {}/{} successful ({:.1} verifications/sec)",
successful, total_items, throughput
);
let result = BatchVerificationResult {
total: total_items,
successful,
failed,
failed_indices,
throughput,
};
Ok(WorkflowResult::success(
context,
result,
execution_time.as_millis() as u64,
))
}
async fn process_chunk(
pool: AgentPool,
messages: Vec<Vec<u8>>,
signatures: Vec<Ed25519Signature>,
public_keys: Vec<VerifyingKey>,
offset: usize,
timeout_duration: Duration,
) -> Result<Vec<(usize, bool)>> {
let agent_id = pool.get_healthy_agent()?;
let mut agent = pool.agents.get_mut(&agent_id).ok_or(Error::PoolExhausted)?;
let mut results = Vec::new();
for (i, ((message, signature), key)) in messages
.into_iter()
.zip(signatures)
.zip(public_keys)
.enumerate()
{
let task = VerificationTask::new(message, signature, key);
let result = match timeout(timeout_duration, agent.verify(task)).await {
Ok(r) => r.is_valid,
Err(_) => false,
};
results.push((offset + i, result));
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::AgentIdentity;
#[tokio::test]
async fn test_batch_verification() {
let pool = AgentPool::new(5);
let workflow = AutonomousBatchWorkflow::new(pool, 1000, 10);
let mut batch = BatchVerificationRequest::new();
for i in 0..20 {
let identity = AgentIdentity::generate().unwrap();
let message = format!("message {}", i).into_bytes();
let signature = identity.sign(&message).unwrap();
batch.add(message, signature, identity.verifying_key());
}
let result = workflow
.execute(batch, WorkflowContext::default())
.await
.unwrap();
assert!(result.success);
assert_eq!(result.data.total, 20);
assert_eq!(result.data.successful, 20);
assert!(result.data.throughput > 0.0);
}
}