kiteticker_async_manager/manager/
message_processor.rs1use crate::models::TickerMessage;
2use crate::manager::ChannelId;
3use std::sync::Arc;
4use std::time::Instant;
5use tokio::sync::{mpsc, broadcast, RwLock};
6use tokio::task::JoinHandle;
7
8#[derive(Debug)]
10pub struct MessageProcessor {
11 pub channel_id: ChannelId,
12 pub input_receiver: Option<mpsc::UnboundedReceiver<TickerMessage>>,
13 pub output_sender: broadcast::Sender<TickerMessage>,
14 pub stats: Arc<RwLock<ProcessorStats>>,
15 pub task_handle: Option<JoinHandle<()>>,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct ProcessorStats {
20 pub messages_processed: u64,
21 pub messages_per_second: f64,
22 pub processing_latency_avg: std::time::Duration,
23 pub last_processed_time: Option<Instant>,
24 pub queue_size: usize,
25 pub errors_count: u64,
26}
27
28impl MessageProcessor {
29 pub fn new(
31 channel_id: ChannelId,
32 input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
33 buffer_size: usize,
34 ) -> (Self, broadcast::Receiver<TickerMessage>) {
35 let (output_sender, output_receiver) = broadcast::channel(buffer_size);
36 let stats = Arc::new(RwLock::new(ProcessorStats::default()));
37
38 let processor = Self {
39 channel_id,
40 input_receiver: Some(input_receiver),
41 output_sender,
42 stats,
43 task_handle: None,
44 };
45
46 (processor, output_receiver)
47 }
48
49 pub fn start(&mut self) {
51 if self.task_handle.is_some() {
53 log::warn!("Message processor for channel {:?} already started", self.channel_id);
54 return;
55 }
56
57 let input_receiver = self.input_receiver.take()
59 .expect("Receiver already taken - processor can only be started once");
60
61 let channel_id = self.channel_id;
62 let output_sender = self.output_sender.clone();
63 let stats = Arc::clone(&self.stats);
64
65 let handle = tokio::spawn(async move {
66 Self::processing_loop(
67 channel_id,
68 input_receiver,
69 output_sender,
70 stats,
71 ).await;
72 });
73
74 self.task_handle = Some(handle);
75 log::info!("Started message processor task for channel {:?}", channel_id);
76 }
77
78 async fn processing_loop(
80 channel_id: ChannelId,
81 mut input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
82 output_sender: broadcast::Sender<TickerMessage>,
83 stats: Arc<RwLock<ProcessorStats>>,
84 ) {
85 let mut last_stats_update = Instant::now();
86 let mut messages_since_last_update = 0u64;
87
88 log::info!("Started message processor for channel {:?}", channel_id);
89
90 while let Some(message) = input_receiver.recv().await {
91 let processing_start = Instant::now();
92
93 let processed_message = Self::process_message(message, channel_id);
95
96 match output_sender.send(processed_message) {
98 Ok(receiver_count) => {
99 if receiver_count == 0 {
101 log::debug!("Channel {:?}: No active receivers", channel_id);
102 }
103 }
104 Err(_) => {
105 log::warn!("Channel {:?}: Failed to send message", channel_id);
107
108 let mut stats = stats.write().await;
109 stats.errors_count += 1;
110 continue;
111 }
112 }
113
114 let processing_time = processing_start.elapsed();
115 messages_since_last_update += 1;
116
117 if last_stats_update.elapsed() >= std::time::Duration::from_secs(1) {
119 let mut stats_guard = stats.write().await;
120 stats_guard.messages_processed += messages_since_last_update;
121 stats_guard.last_processed_time = Some(Instant::now());
122 stats_guard.queue_size = input_receiver.len();
123
124 let elapsed = last_stats_update.elapsed();
126 stats_guard.messages_per_second =
127 messages_since_last_update as f64 / elapsed.as_secs_f64();
128
129 let current_avg = stats_guard.processing_latency_avg;
131 stats_guard.processing_latency_avg = if current_avg.is_zero() {
132 processing_time
133 } else {
134 Duration::from_nanos(
136 (current_avg.as_nanos() as f64 * 0.9 +
137 processing_time.as_nanos() as f64 * 0.1) as u64
138 )
139 };
140
141 drop(stats_guard);
142
143 last_stats_update = Instant::now();
145 messages_since_last_update = 0;
146 }
147 }
148
149 log::info!("Message processor for channel {:?} stopped", channel_id);
150 }
151
152 fn process_message(message: TickerMessage, channel_id: ChannelId) -> TickerMessage {
154 match &message {
162 TickerMessage::Ticks(ticks) => {
163 log::debug!("Channel {:?}: Processed {} ticks", channel_id, ticks.len());
164 }
165 TickerMessage::Error(error) => {
166 log::warn!("Channel {:?}: Error message: {}", channel_id, error);
167 }
168 _ => {
169 log::debug!("Channel {:?}: Processed message: {:?}", channel_id, message);
170 }
171 }
172
173 message
174 }
175
176 pub async fn get_stats(&self) -> ProcessorStats {
178 self.stats.read().await.clone()
179 }
180
181 pub fn queue_size(&self) -> usize {
183 0
186 }
187
188 pub async fn stop(&mut self) {
190 if let Some(handle) = self.task_handle.take() {
191 handle.abort();
192 let _ = handle.await;
193 }
194 }
195}
196
197use std::time::Duration;