tap_node/message/
processor_pool.rs

1//! Processor pool for concurrent message processing.
2//!
3//! This module provides a processor pool for handling concurrent message processing.
4
5use tap_msg::didcomm::Message;
6use tokio::sync::mpsc::{channel, Sender};
7use tokio::time::Duration;
8
9use crate::error::{Error, Result};
10use crate::message::processor::MessageProcessor;
11use crate::message::{CompositeMessageProcessor, MessageProcessorType};
12
13/// Configuration for the processor pool
14#[derive(Debug, Clone)]
15pub struct ProcessorPoolConfig {
16    /// The number of worker tasks to create
17    pub workers: usize,
18    /// The capacity of the message channel
19    pub channel_capacity: usize,
20    /// The maximum duration to wait for a worker to process a message
21    pub worker_timeout: Duration,
22}
23
24impl Default for ProcessorPoolConfig {
25    fn default() -> Self {
26        Self {
27            workers: 4,
28            channel_capacity: 100,
29            worker_timeout: Duration::from_secs(30),
30        }
31    }
32}
33
34/// Processor pool for concurrent message processing
35#[derive(Clone)]
36pub struct ProcessorPool {
37    /// The message processor to use
38    processor: CompositeMessageProcessor,
39    /// Channel for submitting messages for processing
40    tx: Sender<Message>,
41}
42
43impl ProcessorPool {
44    /// Create a new processor pool
45    pub fn new(config: ProcessorPoolConfig) -> Self {
46        let (tx, mut rx) = channel::<Message>(config.channel_capacity);
47        let processors: Vec<MessageProcessorType> = Vec::new();
48        let processor = CompositeMessageProcessor::new(processors);
49        let processor_for_workers = processor.clone();
50
51        // Spawn a single task to distribute messages to workers
52        tokio::spawn(async move {
53            // Create worker channels
54            let mut worker_channels = Vec::with_capacity(config.workers);
55            for _ in 0..config.workers {
56                let (worker_tx, mut worker_rx) = channel::<Message>(config.channel_capacity);
57                worker_channels.push(worker_tx);
58
59                let worker_processor = processor_for_workers.clone();
60                let worker_timeout = config.worker_timeout;
61
62                // Spawn a worker to process messages from its channel
63                tokio::spawn(async move {
64                    while let Some(message) = worker_rx.recv().await {
65                        match tokio::time::timeout(
66                            worker_timeout,
67                            worker_processor.process_incoming(message),
68                        )
69                        .await
70                        {
71                            Ok(result) => {
72                                if let Err(e) = result {
73                                    eprintln!("Error processing message: {}", e);
74                                }
75                            }
76                            Err(_) => {
77                                eprintln!(
78                                    "Message processing timed out after {:?}",
79                                    worker_timeout
80                                );
81                            }
82                        }
83                    }
84                });
85            }
86
87            // Round-robin distribute messages to workers
88            let mut current_worker = 0;
89            while let Some(message) = rx.recv().await {
90                if worker_channels.is_empty() {
91                    break;
92                }
93
94                // Try to send to the current worker, or move to the next one if fails
95                let mut attempts = 0;
96                while attempts < worker_channels.len() {
97                    match worker_channels[current_worker].send(message.clone()).await {
98                        Ok(_) => break,
99                        Err(_) => {
100                            current_worker = (current_worker + 1) % worker_channels.len();
101                            attempts += 1;
102                        }
103                    }
104                }
105
106                // Advance to next worker
107                current_worker = (current_worker + 1) % worker_channels.len();
108            }
109        });
110
111        Self { processor, tx }
112    }
113
114    /// Submit a message for processing
115    pub async fn submit(&self, message: Message) -> Result<()> {
116        self.tx.send(message).await.map_err(|e| {
117            Error::Processing(format!("Failed to submit message to processor pool: {}", e))
118        })
119    }
120
121    /// Add a processor to the pool
122    pub fn add_processor(&mut self, processor: MessageProcessorType) {
123        self.processor.add_processor(processor);
124    }
125}