tap_node/message/
processor_pool.rs1use tap_msg::didcomm::PlainMessage;
6use tokio::sync::mpsc::{channel, Sender};
7use tokio::time::Duration;
8
9use crate::error::{Error, Result};
10use crate::message::processor::PlainMessageProcessor;
11use crate::message::{CompositePlainMessageProcessor, PlainMessageProcessorType};
12
13#[derive(Debug, Clone)]
15pub struct ProcessorPoolConfig {
16 pub workers: usize,
18 pub channel_capacity: usize,
20 pub worker_timeout: Duration,
22}
23
24impl Default for ProcessorPoolConfig {
25 fn default() -> Self {
26 Self {
27 workers: 4,
28 channel_capacity: 100,
29 worker_timeout: Duration::from_secs(30),
30 }
31 }
32}
33
34#[derive(Clone)]
36pub struct ProcessorPool {
37 processor: CompositePlainMessageProcessor,
39 tx: Sender<PlainMessage>,
41}
42
43impl ProcessorPool {
44 pub fn new(config: ProcessorPoolConfig) -> Self {
46 let (tx, mut rx) = channel::<PlainMessage>(config.channel_capacity);
47 let processors: Vec<PlainMessageProcessorType> = Vec::new();
48 let processor = CompositePlainMessageProcessor::new(processors);
49 let processor_for_workers = processor.clone();
50
51 tokio::spawn(async move {
53 let mut worker_channels = Vec::with_capacity(config.workers);
55 for _ in 0..config.workers {
56 let (worker_tx, mut worker_rx) = channel::<PlainMessage>(config.channel_capacity);
57 worker_channels.push(worker_tx);
58
59 let worker_processor = processor_for_workers.clone();
60 let worker_timeout = config.worker_timeout;
61
62 tokio::spawn(async move {
64 while let Some(message) = worker_rx.recv().await {
65 match tokio::time::timeout(
66 worker_timeout,
67 worker_processor.process_incoming(message),
68 )
69 .await
70 {
71 Ok(result) => {
72 if let Err(e) = result {
73 eprintln!("Error processing message: {}", e);
74 }
75 }
76 Err(_) => {
77 eprintln!(
78 "PlainMessage processing timed out after {:?}",
79 worker_timeout
80 );
81 }
82 }
83 }
84 });
85 }
86
87 let mut current_worker = 0;
89 while let Some(message) = rx.recv().await {
90 if worker_channels.is_empty() {
91 break;
92 }
93
94 let mut attempts = 0;
96 while attempts < worker_channels.len() {
97 match worker_channels[current_worker].send(message.clone()).await {
98 Ok(_) => break,
99 Err(_) => {
100 current_worker = (current_worker + 1) % worker_channels.len();
101 attempts += 1;
102 }
103 }
104 }
105
106 current_worker = (current_worker + 1) % worker_channels.len();
108 }
109 });
110
111 Self { processor, tx }
112 }
113
114 pub async fn submit(&self, message: PlainMessage) -> Result<()> {
116 self.tx.send(message).await.map_err(|e| {
117 Error::Processing(format!("Failed to submit message to processor pool: {}", e))
118 })
119 }
120
121 pub fn add_processor(&mut self, processor: PlainMessageProcessorType) {
123 self.processor.add_processor(processor);
124 }
125}