kiteticker_async_manager/manager/
message_processor.rs

1use crate::manager::ChannelId;
2use crate::models::TickerMessage;
3use std::sync::Arc;
4use std::time::Instant;
5use tokio::sync::{broadcast, mpsc, RwLock};
6use tokio::task::JoinHandle;
7
8/// High-performance message processor with dedicated parsing task
9#[derive(Debug)]
10pub struct MessageProcessor {
11  pub channel_id: ChannelId,
12  pub input_receiver: Option<mpsc::UnboundedReceiver<TickerMessage>>,
13  pub output_sender: broadcast::Sender<TickerMessage>,
14  pub stats: Arc<RwLock<ProcessorStats>>,
15  pub task_handle: Option<JoinHandle<()>>,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct ProcessorStats {
20  pub messages_processed: u64,
21  pub messages_per_second: f64,
22  pub processing_latency_avg: std::time::Duration,
23  pub last_processed_time: Option<Instant>,
24  pub queue_size: usize,
25  pub errors_count: u64,
26}
27
28impl MessageProcessor {
29  /// Create a new message processor
30  pub fn new(
31    channel_id: ChannelId,
32    input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
33    buffer_size: usize,
34  ) -> (Self, broadcast::Receiver<TickerMessage>) {
35    let (output_sender, output_receiver) = broadcast::channel(buffer_size);
36    let stats = Arc::new(RwLock::new(ProcessorStats::default()));
37
38    let processor = Self {
39      channel_id,
40      input_receiver: Some(input_receiver),
41      output_sender,
42      stats,
43      task_handle: None,
44    };
45
46    (processor, output_receiver)
47  }
48
49  /// Start the dedicated processing task
50  pub fn start(&mut self) {
51    // Only start if not already running
52    if self.task_handle.is_some() {
53      log::warn!(
54        "Message processor for channel {:?} already started",
55        self.channel_id
56      );
57      return;
58    }
59
60    // Take the receiver to move it to the task (this fixes the bug!)
61    let input_receiver = self
62      .input_receiver
63      .take()
64      .expect("Receiver already taken - processor can only be started once");
65
66    let channel_id = self.channel_id;
67    let output_sender = self.output_sender.clone();
68    let stats = Arc::clone(&self.stats);
69
70    let handle = tokio::spawn(async move {
71      Self::processing_loop(channel_id, input_receiver, output_sender, stats)
72        .await;
73    });
74
75    self.task_handle = Some(handle);
76    log::info!(
77      "Started message processor task for channel {:?}",
78      channel_id
79    );
80  }
81
82  /// High-performance message processing loop
83  async fn processing_loop(
84    channel_id: ChannelId,
85    mut input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
86    output_sender: broadcast::Sender<TickerMessage>,
87    stats: Arc<RwLock<ProcessorStats>>,
88  ) {
89    let mut last_stats_update = Instant::now();
90    let mut messages_since_last_update = 0u64;
91
92    log::info!("Started message processor for channel {:?}", channel_id);
93
94    while let Some(message) = input_receiver.recv().await {
95      let processing_start = Instant::now();
96
97      // Process the message (currently just forwarding, but can add logic here)
98      let processed_message = Self::process_message(message, channel_id);
99
100      // Send to output channel (non-blocking)
101      match output_sender.send(processed_message) {
102        Ok(receiver_count) => {
103          // Successfully sent to all receivers
104          if receiver_count == 0 {
105            log::debug!("Channel {:?}: No active receivers", channel_id);
106          }
107        }
108        Err(_) => {
109          // This shouldn't happen with broadcast channels unless buffer is full
110          log::warn!("Channel {:?}: Failed to send message", channel_id);
111
112          let mut stats = stats.write().await;
113          stats.errors_count += 1;
114          continue;
115        }
116      }
117
118      let processing_time = processing_start.elapsed();
119      messages_since_last_update += 1;
120
121      // Update stats periodically to avoid lock contention
122      if last_stats_update.elapsed() >= std::time::Duration::from_secs(1) {
123        let mut stats_guard = stats.write().await;
124        stats_guard.messages_processed += messages_since_last_update;
125        stats_guard.last_processed_time = Some(Instant::now());
126        stats_guard.queue_size = input_receiver.len();
127
128        // Calculate messages per second
129        let elapsed = last_stats_update.elapsed();
130        stats_guard.messages_per_second =
131          messages_since_last_update as f64 / elapsed.as_secs_f64();
132
133        // Update average processing latency (simple moving average)
134        let current_avg = stats_guard.processing_latency_avg;
135        stats_guard.processing_latency_avg = if current_avg.is_zero() {
136          processing_time
137        } else {
138          // Weighted average: 90% old + 10% new
139          Duration::from_nanos(
140            (current_avg.as_nanos() as f64 * 0.9
141              + processing_time.as_nanos() as f64 * 0.1) as u64,
142          )
143        };
144
145        drop(stats_guard);
146
147        // Reset counters
148        last_stats_update = Instant::now();
149        messages_since_last_update = 0;
150      }
151    }
152
153    log::info!("Message processor for channel {:?} stopped", channel_id);
154  }
155
156  /// Process individual message (can be extended for custom logic)
157  fn process_message(
158    message: TickerMessage,
159    channel_id: ChannelId,
160  ) -> TickerMessage {
161    // Currently just passes through, but you can add:
162    // - Message validation
163    // - Data enrichment
164    // - Format conversion
165    // - Filtering logic
166    // - Latency tagging
167
168    match &message {
169      TickerMessage::Ticks(ticks) => {
170        log::debug!(
171          "Channel {:?}: Processed {} ticks",
172          channel_id,
173          ticks.len()
174        );
175      }
176      TickerMessage::Error(error) => {
177        log::warn!("Channel {:?}: Error message: {}", channel_id, error);
178      }
179      _ => {
180        log::debug!(
181          "Channel {:?}: Processed message: {:?}",
182          channel_id,
183          message
184        );
185      }
186    }
187
188    message
189  }
190
191  /// Get current processor statistics
192  pub async fn get_stats(&self) -> ProcessorStats {
193    self.stats.read().await.clone()
194  }
195
196  /// Get current queue size (non-blocking)
197  pub fn queue_size(&self) -> usize {
198    // Note: This is approximate since we can't access the receiver's len() from here
199    // In a real implementation, you might want to track this differently
200    0
201  }
202
203  /// Stop the processor
204  pub async fn stop(&mut self) {
205    if let Some(handle) = self.task_handle.take() {
206      handle.abort();
207      let _ = handle.await;
208    }
209  }
210}
211
212use std::time::Duration;