tap_node/message/
processor_pool.rs1use tap_msg::didcomm::PlainMessage;
6use tokio::sync::mpsc::{channel, Sender};
7use tokio::time::Duration;
8use tracing::error;
9
10use crate::error::{Error, Result};
11use crate::message::processor::PlainMessageProcessor;
12use crate::message::{CompositePlainMessageProcessor, PlainMessageProcessorType};
13
14#[derive(Debug, Clone)]
16pub struct ProcessorPoolConfig {
17 pub workers: usize,
19 pub channel_capacity: usize,
21 pub worker_timeout: Duration,
23}
24
25impl Default for ProcessorPoolConfig {
26 fn default() -> Self {
27 Self {
28 workers: 4,
29 channel_capacity: 100,
30 worker_timeout: Duration::from_secs(30),
31 }
32 }
33}
34
35#[derive(Clone)]
37pub struct ProcessorPool {
38 processor: CompositePlainMessageProcessor,
40 tx: Sender<PlainMessage>,
42}
43
44impl ProcessorPool {
45 pub fn new(config: ProcessorPoolConfig) -> Self {
47 let (tx, mut rx) = channel::<PlainMessage>(config.channel_capacity);
48 let processors: Vec<PlainMessageProcessorType> = Vec::new();
49 let processor = CompositePlainMessageProcessor::new(processors);
50 let processor_for_workers = processor.clone();
51
52 tokio::spawn(async move {
54 let mut worker_channels = Vec::with_capacity(config.workers);
56 for _ in 0..config.workers {
57 let (worker_tx, mut worker_rx) = channel::<PlainMessage>(config.channel_capacity);
58 worker_channels.push(worker_tx);
59
60 let worker_processor = processor_for_workers.clone();
61 let worker_timeout = config.worker_timeout;
62
63 tokio::spawn(async move {
65 while let Some(message) = worker_rx.recv().await {
66 match tokio::time::timeout(
67 worker_timeout,
68 worker_processor.process_incoming(message),
69 )
70 .await
71 {
72 Ok(result) => {
73 if let Err(e) = result {
74 error!("Error processing message: {}", e);
75 }
76 }
77 Err(_) => {
78 error!(
79 "PlainMessage processing timed out after {:?}",
80 worker_timeout
81 );
82 }
83 }
84 }
85 });
86 }
87
88 let mut current_worker = 0;
90 while let Some(message) = rx.recv().await {
91 if worker_channels.is_empty() {
92 break;
93 }
94
95 let mut attempts = 0;
97 while attempts < worker_channels.len() {
98 match worker_channels[current_worker].send(message.clone()).await {
99 Ok(_) => break,
100 Err(_) => {
101 current_worker = (current_worker + 1) % worker_channels.len();
102 attempts += 1;
103 }
104 }
105 }
106
107 current_worker = (current_worker + 1) % worker_channels.len();
109 }
110 });
111
112 Self { processor, tx }
113 }
114
115 pub async fn submit(&self, message: PlainMessage) -> Result<()> {
117 self.tx.send(message).await.map_err(|e| {
118 Error::Processing(format!("Failed to submit message to processor pool: {}", e))
119 })
120 }
121
122 pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
124 self.processor.add_processor(processor);
125 }
126}