kiteticker_async_manager/manager/
message_processor.rs1use crate::manager::ChannelId;
2use crate::models::TickerMessage;
3use std::sync::Arc;
4use std::time::Instant;
5use tokio::sync::{broadcast, mpsc, RwLock};
6use tokio::task::JoinHandle;
7
8#[derive(Debug)]
10pub struct MessageProcessor {
11 pub channel_id: ChannelId,
12 pub input_receiver: Option<mpsc::UnboundedReceiver<TickerMessage>>,
13 pub output_sender: broadcast::Sender<TickerMessage>,
14 pub stats: Arc<RwLock<ProcessorStats>>,
15 pub task_handle: Option<JoinHandle<()>>,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct ProcessorStats {
20 pub messages_processed: u64,
21 pub messages_per_second: f64,
22 pub processing_latency_avg: std::time::Duration,
23 pub last_processed_time: Option<Instant>,
24 pub queue_size: usize,
25 pub errors_count: u64,
26}
27
28impl MessageProcessor {
29 pub fn new(
31 channel_id: ChannelId,
32 input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
33 buffer_size: usize,
34 ) -> (Self, broadcast::Receiver<TickerMessage>) {
35 let (output_sender, output_receiver) = broadcast::channel(buffer_size);
36 let stats = Arc::new(RwLock::new(ProcessorStats::default()));
37
38 let processor = Self {
39 channel_id,
40 input_receiver: Some(input_receiver),
41 output_sender,
42 stats,
43 task_handle: None,
44 };
45
46 (processor, output_receiver)
47 }
48
49 pub fn start(&mut self) {
51 if self.task_handle.is_some() {
53 log::warn!(
54 "Message processor for channel {:?} already started",
55 self.channel_id
56 );
57 return;
58 }
59
60 let input_receiver = self
62 .input_receiver
63 .take()
64 .expect("Receiver already taken - processor can only be started once");
65
66 let channel_id = self.channel_id;
67 let output_sender = self.output_sender.clone();
68 let stats = Arc::clone(&self.stats);
69
70 let handle = tokio::spawn(async move {
71 Self::processing_loop(channel_id, input_receiver, output_sender, stats)
72 .await;
73 });
74
75 self.task_handle = Some(handle);
76 log::info!(
77 "Started message processor task for channel {:?}",
78 channel_id
79 );
80 }
81
82 async fn processing_loop(
84 channel_id: ChannelId,
85 mut input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
86 output_sender: broadcast::Sender<TickerMessage>,
87 stats: Arc<RwLock<ProcessorStats>>,
88 ) {
89 let mut last_stats_update = Instant::now();
90 let mut messages_since_last_update = 0u64;
91
92 log::info!("Started message processor for channel {:?}", channel_id);
93
94 while let Some(message) = input_receiver.recv().await {
95 let processing_start = Instant::now();
96
97 let processed_message = Self::process_message(message, channel_id);
99
100 match output_sender.send(processed_message) {
102 Ok(receiver_count) => {
103 if receiver_count == 0 {
105 log::debug!("Channel {:?}: No active receivers", channel_id);
106 }
107 }
108 Err(_) => {
109 log::warn!("Channel {:?}: Failed to send message", channel_id);
111
112 let mut stats = stats.write().await;
113 stats.errors_count += 1;
114 continue;
115 }
116 }
117
118 let processing_time = processing_start.elapsed();
119 messages_since_last_update += 1;
120
121 if last_stats_update.elapsed() >= std::time::Duration::from_secs(1) {
123 let mut stats_guard = stats.write().await;
124 stats_guard.messages_processed += messages_since_last_update;
125 stats_guard.last_processed_time = Some(Instant::now());
126 stats_guard.queue_size = input_receiver.len();
127
128 let elapsed = last_stats_update.elapsed();
130 stats_guard.messages_per_second =
131 messages_since_last_update as f64 / elapsed.as_secs_f64();
132
133 let current_avg = stats_guard.processing_latency_avg;
135 stats_guard.processing_latency_avg = if current_avg.is_zero() {
136 processing_time
137 } else {
138 Duration::from_nanos(
140 (current_avg.as_nanos() as f64 * 0.9
141 + processing_time.as_nanos() as f64 * 0.1) as u64,
142 )
143 };
144
145 drop(stats_guard);
146
147 last_stats_update = Instant::now();
149 messages_since_last_update = 0;
150 }
151 }
152
153 log::info!("Message processor for channel {:?} stopped", channel_id);
154 }
155
156 fn process_message(
158 message: TickerMessage,
159 channel_id: ChannelId,
160 ) -> TickerMessage {
161 match &message {
169 TickerMessage::Ticks(ticks) => {
170 log::debug!(
171 "Channel {:?}: Processed {} ticks",
172 channel_id,
173 ticks.len()
174 );
175 }
176 TickerMessage::Error(error) => {
177 log::warn!("Channel {:?}: Error message: {}", channel_id, error);
178 }
179 _ => {
180 log::debug!(
181 "Channel {:?}: Processed message: {:?}",
182 channel_id,
183 message
184 );
185 }
186 }
187
188 message
189 }
190
191 pub async fn get_stats(&self) -> ProcessorStats {
193 self.stats.read().await.clone()
194 }
195
196 pub fn queue_size(&self) -> usize {
198 0
201 }
202
203 pub async fn stop(&mut self) {
205 if let Some(handle) = self.task_handle.take() {
206 handle.abort();
207 let _ = handle.await;
208 }
209 }
210}
211
212use std::time::Duration;