Skip to main content

tap_node/message/
processor_pool.rs

1//! Processor pool for concurrent message processing.
2//!
3//! This module provides a processor pool for handling concurrent message processing.
4
5use tap_msg::didcomm::PlainMessage;
6use tokio::sync::mpsc::{channel, Sender};
7use tokio::time::Duration;
8use tracing::error;
9
10use crate::error::{Error, Result};
11use crate::message::processor::PlainMessageProcessor;
12use crate::message::{CompositePlainMessageProcessor, PlainMessageProcessorType};
13
14/// Configuration for the processor pool
15#[derive(Debug, Clone)]
16pub struct ProcessorPoolConfig {
17    /// The number of worker tasks to create
18    pub workers: usize,
19    /// The capacity of the message channel
20    pub channel_capacity: usize,
21    /// The maximum duration to wait for a worker to process a message
22    pub worker_timeout: Duration,
23}
24
25impl Default for ProcessorPoolConfig {
26    fn default() -> Self {
27        Self {
28            workers: 4,
29            channel_capacity: 100,
30            worker_timeout: Duration::from_secs(30),
31        }
32    }
33}
34
35/// Processor pool for concurrent message processing
36#[derive(Clone)]
37pub struct ProcessorPool {
38    /// The message processor to use
39    processor: CompositePlainMessageProcessor,
40    /// Channel for submitting messages for processing
41    tx: Sender<PlainMessage>,
42}
43
44impl ProcessorPool {
45    /// Create a new processor pool
46    pub fn new(config: ProcessorPoolConfig) -> Self {
47        let (tx, mut rx) = channel::<PlainMessage>(config.channel_capacity);
48        let processors: Vec<PlainMessageProcessorType> = Vec::new();
49        let processor = CompositePlainMessageProcessor::new(processors);
50        let processor_for_workers = processor.clone();
51
52        // Spawn a single task to distribute messages to workers
53        tokio::spawn(async move {
54            // Create worker channels
55            let mut worker_channels = Vec::with_capacity(config.workers);
56            for _ in 0..config.workers {
57                let (worker_tx, mut worker_rx) = channel::<PlainMessage>(config.channel_capacity);
58                worker_channels.push(worker_tx);
59
60                let worker_processor = processor_for_workers.clone();
61                let worker_timeout = config.worker_timeout;
62
63                // Spawn a worker to process messages from its channel
64                tokio::spawn(async move {
65                    while let Some(message) = worker_rx.recv().await {
66                        match tokio::time::timeout(
67                            worker_timeout,
68                            worker_processor.process_incoming(message),
69                        )
70                        .await
71                        {
72                            Ok(result) => {
73                                if let Err(e) = result {
74                                    error!("Error processing message: {}", e);
75                                }
76                            }
77                            Err(_) => {
78                                error!(
79                                    "PlainMessage processing timed out after {:?}",
80                                    worker_timeout
81                                );
82                            }
83                        }
84                    }
85                });
86            }
87
88            // Round-robin distribute messages to workers
89            let mut current_worker = 0;
90            while let Some(message) = rx.recv().await {
91                if worker_channels.is_empty() {
92                    break;
93                }
94
95                // Try to send to the current worker, or move to the next one if fails
96                let mut attempts = 0;
97                while attempts < worker_channels.len() {
98                    match worker_channels[current_worker].send(message.clone()).await {
99                        Ok(_) => break,
100                        Err(_) => {
101                            current_worker = (current_worker + 1) % worker_channels.len();
102                            attempts += 1;
103                        }
104                    }
105                }
106
107                // Advance to next worker
108                current_worker = (current_worker + 1) % worker_channels.len();
109            }
110        });
111
112        Self { processor, tx }
113    }
114
115    /// Submit a message for processing
116    pub async fn submit(&self, message: PlainMessage) -> Result<()> {
117        self.tx.send(message).await.map_err(|e| {
118            Error::Processing(format!("Failed to submit message to processor pool: {}", e))
119        })
120    }
121
122    /// Add a processor to the pool
123    pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
124        self.processor.add_processor(processor);
125    }
126}