use tap_msg::didcomm::PlainMessage;
use tokio::sync::mpsc::{channel, Sender};
use tokio::time::Duration;
use tracing::error;
use crate::error::{Error, Result};
use crate::message::processor::PlainMessageProcessor;
use crate::message::{CompositePlainMessageProcessor, PlainMessageProcessorType};
#[derive(Debug, Clone)]
pub struct ProcessorPoolConfig {
pub workers: usize,
pub channel_capacity: usize,
pub worker_timeout: Duration,
}
impl Default for ProcessorPoolConfig {
fn default() -> Self {
Self {
workers: 4,
channel_capacity: 100,
worker_timeout: Duration::from_secs(30),
}
}
}
#[derive(Clone)]
pub struct ProcessorPool {
processor: CompositePlainMessageProcessor,
tx: Sender<PlainMessage>,
}
impl ProcessorPool {
pub fn new(config: ProcessorPoolConfig) -> Self {
let (tx, mut rx) = channel::<PlainMessage>(config.channel_capacity);
let processors: Vec<PlainMessageProcessorType> = Vec::new();
let processor = CompositePlainMessageProcessor::new(processors);
let processor_for_workers = processor.clone();
tokio::spawn(async move {
let mut worker_channels = Vec::with_capacity(config.workers);
for _ in 0..config.workers {
let (worker_tx, mut worker_rx) = channel::<PlainMessage>(config.channel_capacity);
worker_channels.push(worker_tx);
let worker_processor = processor_for_workers.clone();
let worker_timeout = config.worker_timeout;
tokio::spawn(async move {
while let Some(message) = worker_rx.recv().await {
match tokio::time::timeout(
worker_timeout,
worker_processor.process_incoming(message),
)
.await
{
Ok(result) => {
if let Err(e) = result {
error!("Error processing message: {}", e);
}
}
Err(_) => {
error!(
"PlainMessage processing timed out after {:?}",
worker_timeout
);
}
}
}
});
}
let mut current_worker = 0;
while let Some(message) = rx.recv().await {
if worker_channels.is_empty() {
break;
}
let mut attempts = 0;
while attempts < worker_channels.len() {
match worker_channels[current_worker].send(message.clone()).await {
Ok(_) => break,
Err(_) => {
current_worker = (current_worker + 1) % worker_channels.len();
attempts += 1;
}
}
}
current_worker = (current_worker + 1) % worker_channels.len();
}
});
Self { processor, tx }
}
pub async fn submit(&self, message: PlainMessage) -> Result<()> {
self.tx.send(message).await.map_err(|e| {
Error::Processing(format!("Failed to submit message to processor pool: {}", e))
})
}
pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
self.processor.add_processor(processor);
}
}