kiteticker_async_manager/manager/
message_processor.rs

1use crate::models::TickerMessage;
2use crate::manager::ChannelId;
3use std::sync::Arc;
4use std::time::Instant;
5use tokio::sync::{mpsc, broadcast, RwLock};
6use tokio::task::JoinHandle;
7
8/// High-performance message processor with dedicated parsing task
9#[derive(Debug)]
10pub struct MessageProcessor {
11    pub channel_id: ChannelId,
12    pub input_receiver: Option<mpsc::UnboundedReceiver<TickerMessage>>,
13    pub output_sender: broadcast::Sender<TickerMessage>,
14    pub stats: Arc<RwLock<ProcessorStats>>,
15    pub task_handle: Option<JoinHandle<()>>,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct ProcessorStats {
20    pub messages_processed: u64,
21    pub messages_per_second: f64,
22    pub processing_latency_avg: std::time::Duration,
23    pub last_processed_time: Option<Instant>,
24    pub queue_size: usize,
25    pub errors_count: u64,
26}
27
28impl MessageProcessor {
29    /// Create a new message processor
30    pub fn new(
31        channel_id: ChannelId,
32        input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
33        buffer_size: usize,
34    ) -> (Self, broadcast::Receiver<TickerMessage>) {
35        let (output_sender, output_receiver) = broadcast::channel(buffer_size);
36        let stats = Arc::new(RwLock::new(ProcessorStats::default()));
37        
38        let processor = Self {
39            channel_id,
40            input_receiver: Some(input_receiver),
41            output_sender,
42            stats,
43            task_handle: None,
44        };
45        
46        (processor, output_receiver)
47    }
48    
49    /// Start the dedicated processing task
50    pub fn start(&mut self) {
51        // Only start if not already running
52        if self.task_handle.is_some() {
53            log::warn!("Message processor for channel {:?} already started", self.channel_id);
54            return;
55        }
56        
57        // Take the receiver to move it to the task (this fixes the bug!)
58        let input_receiver = self.input_receiver.take()
59            .expect("Receiver already taken - processor can only be started once");
60        
61        let channel_id = self.channel_id;
62        let output_sender = self.output_sender.clone();
63        let stats = Arc::clone(&self.stats);
64        
65        let handle = tokio::spawn(async move {
66            Self::processing_loop(
67                channel_id,
68                input_receiver,
69                output_sender,
70                stats,
71            ).await;
72        });
73        
74        self.task_handle = Some(handle);
75        log::info!("Started message processor task for channel {:?}", channel_id);
76    }
77    
78    /// High-performance message processing loop
79    async fn processing_loop(
80        channel_id: ChannelId,
81        mut input_receiver: mpsc::UnboundedReceiver<TickerMessage>,
82        output_sender: broadcast::Sender<TickerMessage>,
83        stats: Arc<RwLock<ProcessorStats>>,
84    ) {
85        let mut last_stats_update = Instant::now();
86        let mut messages_since_last_update = 0u64;
87        
88        log::info!("Started message processor for channel {:?}", channel_id);
89        
90        while let Some(message) = input_receiver.recv().await {
91            let processing_start = Instant::now();
92            
93            // Process the message (currently just forwarding, but can add logic here)
94            let processed_message = Self::process_message(message, channel_id);
95            
96            // Send to output channel (non-blocking)
97            match output_sender.send(processed_message) {
98                Ok(receiver_count) => {
99                    // Successfully sent to all receivers
100                    if receiver_count == 0 {
101                        log::debug!("Channel {:?}: No active receivers", channel_id);
102                    }
103                }
104                Err(_) => {
105                    // This shouldn't happen with broadcast channels unless buffer is full
106                    log::warn!("Channel {:?}: Failed to send message", channel_id);
107                    
108                    let mut stats = stats.write().await;
109                    stats.errors_count += 1;
110                    continue;
111                }
112            }
113            
114            let processing_time = processing_start.elapsed();
115            messages_since_last_update += 1;
116            
117            // Update stats periodically to avoid lock contention
118            if last_stats_update.elapsed() >= std::time::Duration::from_secs(1) {
119                let mut stats_guard = stats.write().await;
120                stats_guard.messages_processed += messages_since_last_update;
121                stats_guard.last_processed_time = Some(Instant::now());
122                stats_guard.queue_size = input_receiver.len();
123                
124                // Calculate messages per second
125                let elapsed = last_stats_update.elapsed();
126                stats_guard.messages_per_second = 
127                    messages_since_last_update as f64 / elapsed.as_secs_f64();
128                
129                // Update average processing latency (simple moving average)
130                let current_avg = stats_guard.processing_latency_avg;
131                stats_guard.processing_latency_avg = if current_avg.is_zero() {
132                    processing_time
133                } else {
134                    // Weighted average: 90% old + 10% new
135                    Duration::from_nanos(
136                        (current_avg.as_nanos() as f64 * 0.9 + 
137                         processing_time.as_nanos() as f64 * 0.1) as u64
138                    )
139                };
140                
141                drop(stats_guard);
142                
143                // Reset counters
144                last_stats_update = Instant::now();
145                messages_since_last_update = 0;
146            }
147        }
148        
149        log::info!("Message processor for channel {:?} stopped", channel_id);
150    }
151    
152    /// Process individual message (can be extended for custom logic)
153    fn process_message(message: TickerMessage, channel_id: ChannelId) -> TickerMessage {
154        // Currently just passes through, but you can add:
155        // - Message validation
156        // - Data enrichment
157        // - Format conversion
158        // - Filtering logic
159        // - Latency tagging
160        
161        match &message {
162            TickerMessage::Ticks(ticks) => {
163                log::debug!("Channel {:?}: Processed {} ticks", channel_id, ticks.len());
164            }
165            TickerMessage::Error(error) => {
166                log::warn!("Channel {:?}: Error message: {}", channel_id, error);
167            }
168            _ => {
169                log::debug!("Channel {:?}: Processed message: {:?}", channel_id, message);
170            }
171        }
172        
173        message
174    }
175    
176    /// Get current processor statistics
177    pub async fn get_stats(&self) -> ProcessorStats {
178        self.stats.read().await.clone()
179    }
180    
181    /// Get current queue size (non-blocking)
182    pub fn queue_size(&self) -> usize {
183        // Note: This is approximate since we can't access the receiver's len() from here
184        // In a real implementation, you might want to track this differently
185        0
186    }
187    
188    /// Stop the processor
189    pub async fn stop(&mut self) {
190        if let Some(handle) = self.task_handle.take() {
191            handle.abort();
192            let _ = handle.await;
193        }
194    }
195}
196
197use std::time::Duration;