Skip to main content

fiddler/runtime/
mod.rs

1use flume::{bounded, Receiver, Sender};
2use rustc_hash::FxHashMap;
3use serde::Deserialize;
4use serde::Serialize;
5use serde_yaml::Value;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fmt;
9use std::future::Future;
10use std::time::Instant;
11use sysinfo::System;
12use tokio::task::JoinSet;
13use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::MetricEntry;
17use std::str::FromStr;
18use std::sync::Once;
19
20/// Timeout for stale message entries in the state tracker (1 hour)
21const STALE_MESSAGE_TIMEOUT_SECS: u64 = 3600;
22
23/// Interval for cleaning up stale entries (5 minutes)
24const STALE_CLEANUP_INTERVAL_SECS: u64 = 300;
25
26/// Default capacity for internal message channels.
27/// Higher values allow more buffering and parallelism but use more memory.
28const CHANNEL_CAPACITY: usize = 10_000;
29
30/// Tracks message processing statistics for observability.
31///
32/// This struct maintains counters for various message processing events
33/// to enable monitoring and debugging of the pipeline.
34#[derive(Debug, Default)]
35pub struct MessageMetrics {
36    /// Total messages received from input
37    pub total_received: u64,
38    /// Messages successfully processed through all outputs
39    pub total_completed: u64,
40    /// Messages that encountered processing errors
41    pub total_process_errors: u64,
42    /// Messages that encountered output errors
43    pub total_output_errors: u64,
44    /// Messages intentionally filtered/dropped
45    pub total_filtered: u64,
46    /// Streams started
47    pub streams_started: u64,
48    /// Streams completed
49    pub streams_completed: u64,
50    /// Duplicate messages rejected
51    pub duplicates_rejected: u64,
52    /// Stale entries cleaned up
53    pub stale_entries_removed: u64,
54    /// Total bytes received from input
55    pub input_bytes: u64,
56    /// Total bytes written to output
57    pub output_bytes: u64,
58    /// Timestamp when metrics collection started
59    started_at: Option<Instant>,
60    /// Sum of all message latencies in microseconds (for average calculation)
61    latency_sum_us: u128,
62    /// Count of messages with recorded latency
63    latency_count: u64,
64    /// Minimum latency in microseconds
65    latency_min_us: Option<u64>,
66    /// Maximum latency in microseconds
67    latency_max_us: u64,
68}
69
70impl MessageMetrics {
71    /// Creates a new MessageMetrics instance with the current timestamp.
72    pub fn new() -> Self {
73        Self {
74            started_at: Some(Instant::now()),
75            ..Default::default()
76        }
77    }
78
79    /// Returns the duration since metrics collection started.
80    pub fn elapsed(&self) -> std::time::Duration {
81        self.started_at.map(|t| t.elapsed()).unwrap_or_default()
82    }
83
84    /// Returns the throughput in messages per second.
85    pub fn throughput_per_sec(&self) -> f64 {
86        let elapsed = self.elapsed().as_secs_f64();
87        if elapsed > 0.0 {
88            self.total_completed as f64 / elapsed
89        } else {
90            0.0
91        }
92    }
93
94    /// Returns the throughput in bytes per second (based on output_bytes).
95    pub fn bytes_per_sec(&self) -> f64 {
96        let elapsed = self.elapsed().as_secs_f64();
97        if elapsed > 0.0 {
98            self.output_bytes as f64 / elapsed
99        } else {
100            0.0
101        }
102    }
103
104    /// Returns the error rate as a percentage.
105    pub fn error_rate(&self) -> f64 {
106        let total = self.total_completed + self.total_process_errors + self.total_output_errors;
107        if total > 0 {
108            ((self.total_process_errors + self.total_output_errors) as f64 / total as f64) * 100.0
109        } else {
110            0.0
111        }
112    }
113
114    /// Records a latency measurement for a completed message.
115    ///
116    /// # Arguments
117    ///
118    /// * `latency` - The duration from when the message entered processing until completion
119    pub fn record_latency(&mut self, latency: std::time::Duration) {
120        let latency_us = latency.as_micros() as u64;
121        self.latency_sum_us += latency_us as u128;
122        self.latency_count += 1;
123
124        // Update min latency
125        match self.latency_min_us {
126            Some(min) if latency_us < min => self.latency_min_us = Some(latency_us),
127            None => self.latency_min_us = Some(latency_us),
128            _ => {}
129        }
130
131        // Update max latency
132        if latency_us > self.latency_max_us {
133            self.latency_max_us = latency_us;
134        }
135    }
136
137    /// Returns the average latency in milliseconds.
138    pub fn latency_avg_ms(&self) -> f64 {
139        if self.latency_count > 0 {
140            (self.latency_sum_us as f64 / self.latency_count as f64) / 1000.0
141        } else {
142            0.0
143        }
144    }
145
146    /// Returns the minimum latency in milliseconds.
147    pub fn latency_min_ms(&self) -> f64 {
148        self.latency_min_us
149            .map(|us| us as f64 / 1000.0)
150            .unwrap_or(0.0)
151    }
152
153    /// Returns the maximum latency in milliseconds.
154    pub fn latency_max_ms(&self) -> f64 {
155        self.latency_max_us as f64 / 1000.0
156    }
157
158    /// Records metrics to the configured metrics backend.
159    ///
160    /// If no metrics backend is configured, this is a no-op.
161    /// If `system` is provided, CPU and memory metrics will be included.
162    pub fn record(
163        &self,
164        metrics_backend: &mut dyn Metrics,
165        in_flight: usize,
166        system: Option<&mut System>,
167    ) {
168        // Collect system metrics if enabled
169        let (cpu_usage_percent, memory_used_bytes, memory_total_bytes) = if let Some(sys) = system {
170            sys.refresh_cpu_usage();
171            sys.refresh_memory();
172            let cpu = sys.global_cpu_usage();
173            let mem_used = sys.used_memory();
174            let mem_total = sys.total_memory();
175            (Some(cpu), Some(mem_used), Some(mem_total))
176        } else {
177            (None, None, None)
178        };
179
180        metrics_backend.record(MetricEntry {
181            total_received: self.total_received,
182            total_completed: self.total_completed,
183            total_process_errors: self.total_process_errors,
184            total_output_errors: self.total_output_errors,
185            total_filtered: self.total_filtered,
186            streams_started: self.streams_started,
187            streams_completed: self.streams_completed,
188            duplicates_rejected: self.duplicates_rejected,
189            stale_entries_removed: self.stale_entries_removed,
190            in_flight,
191            throughput_per_sec: self.throughput_per_sec(),
192            cpu_usage_percent,
193            memory_used_bytes,
194            memory_total_bytes,
195            input_bytes: self.input_bytes,
196            output_bytes: self.output_bytes,
197            bytes_per_sec: self.bytes_per_sec(),
198            latency_avg_ms: self.latency_avg_ms(),
199            latency_min_ms: self.latency_min_ms(),
200            latency_max_ms: self.latency_max_ms(),
201        });
202    }
203}
204
205/// Spawns an async task on the shared tokio runtime.
206/// Using the shared runtime enables work-stealing across all tasks for better CPU utilization.
207fn spawn_task<F>(handles: &mut JoinSet<Result<(), Error>>, task: F)
208where
209    F: Future<Output = Result<(), Error>> + Send + 'static,
210{
211    handles.spawn(task);
212}
213
214use super::CallbackChan;
215use super::Error;
216use super::Message;
217use super::Metrics;
218use crate::config::parse_configuration_item;
219use crate::config::ExecutionType;
220use crate::config::{Config, ItemType, ParsedConfig, ParsedRegisteredItem};
221
222use crate::modules::metrics::create_metrics;
223use crate::modules::outputs;
224use crate::modules::processors;
225use crate::modules::register_plugins;
226use crate::Status;
227
228use once_cell::sync::Lazy;
229use std::sync::Mutex;
230
231static REGISTER: Once = Once::new();
232/// Stores any error that occurred during plugin registration
233static REGISTER_ERROR: Lazy<Mutex<Option<String>>> = Lazy::new(|| Mutex::new(None));
234
235/// Represents a single data pipeline configuration Runtime to run
236pub struct Runtime {
237    config: ParsedConfig,
238    state_tx: Sender<InternalMessageState>,
239    state_rx: Receiver<InternalMessageState>,
240    timeout: Option<Duration>,
241}
242
243#[derive(Clone, Serialize, Deserialize, Default, Debug)]
244pub(crate) enum MessageStatus {
245    #[default]
246    New,
247    Processed,
248    ProcessError(String),
249    Output,
250    OutputError(String),
251    Filtered,
252    Shutdown,
253    StreamComplete,
254}
255
256impl fmt::Display for MessageStatus {
257    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258        match *self {
259            MessageStatus::New => write!(f, "New"),
260            MessageStatus::Processed => write!(f, "Processed"),
261            MessageStatus::ProcessError(_) => write!(f, "ProcessError"),
262            MessageStatus::Output => write!(f, "Output"),
263            MessageStatus::OutputError(_) => write!(f, "OutputError"),
264            MessageStatus::Filtered => write!(f, "Filtered"),
265            MessageStatus::Shutdown => write!(f, "Shutdown"),
266            MessageStatus::StreamComplete => write!(f, "StreamComplete"),
267        }
268    }
269}
270
271#[derive(Clone, Serialize, Deserialize, Default)]
272pub(crate) struct InternalMessageState {
273    pub message_id: String,
274    pub status: MessageStatus,
275    pub stream_id: Option<String>,
276    pub is_stream: bool,
277    /// Bytes associated with this state update (used for output bytes tracking)
278    pub bytes: u64,
279}
280
281#[derive(Clone, Serialize, Deserialize)]
282pub(crate) struct InternalMessage {
283    pub message: Message,
284    pub message_id: String,
285    pub status: MessageStatus,
286}
287
288pub(crate) struct MessageHandle {
289    pub message_id: String,
290    pub closure: Option<CallbackChan>,
291    pub stream_id: Option<String>,
292    pub is_stream: bool,
293    pub stream_complete: bool,
294    /// Bytes received from input for this message
295    pub input_bytes: u64,
296}
297
298impl Runtime {
299    /// The function takes the raw configuration of the data pipeline and registers built-in
300    /// plugins, validates the configuration and returns the Runtime to run.
301    /// ```
302    /// use fiddler::Runtime;
303    ///
304    /// let conf_str = r#"input:
305    ///  stdin: {}
306    ///processors:
307    ///  - label: my_cool_mapping
308    ///    noop: {}
309    ///output:
310    ///  stdout: {}"#;
311    /// # tokio_test::block_on(async {
312    /// let env = Runtime::from_config(conf_str).await.unwrap();
313    /// # })
314    /// ```
315    pub async fn from_config(config: &str) -> Result<Self, Error> {
316        REGISTER.call_once(|| {
317            if let Err(e) = register_plugins() {
318                if let Ok(mut err) = REGISTER_ERROR.lock() {
319                    *err = Some(format!("{e}"));
320                }
321            }
322        });
323
324        // Check if registration failed
325        if let Ok(err_lock) = REGISTER_ERROR.lock() {
326            if let Some(ref e) = *err_lock {
327                return Err(Error::ExecutionError(format!(
328                    "Plugin registration failed: {e}"
329                )));
330            }
331        }
332        trace!("plugins registered");
333
334        let conf: Config = Config::from_str(config)?;
335        let parsed_conf = conf.validate().await?;
336
337        let (state_tx, state_rx) = bounded(CHANNEL_CAPACITY);
338
339        debug!("Runtime is ready");
340        Ok(Runtime {
341            config: parsed_conf,
342            state_rx,
343            state_tx,
344            timeout: None,
345        })
346    }
347
348    /// The function sets the data pipeline with a label.
349    /// ```
350    /// # use fiddler::Runtime;
351    /// # let conf_str = r#"input:
352    /// #   stdin: {}
353    /// # processors:
354    /// #  - label: my_cool_mapping
355    /// #    noop: {}
356    /// # output:
357    /// #   stdout: {}"#;
358    /// # tokio_test::block_on(async {
359    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
360    /// env.set_label(Some("MyPipeline".into())).unwrap();
361    /// # });
362    /// ```
363    /// or to remove a given label:
364    /// ```
365    /// # use fiddler::Runtime;
366    /// # let conf_str = r#"input:
367    /// #  stdin: {}
368    /// # processors:
369    /// #  - label: my_cool_mapping
370    /// #    noop: {}
371    /// # output:
372    /// #   stdout: {}"#;
373    /// # tokio_test::block_on(async {
374    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
375    /// env.set_label(None).unwrap();
376    /// # });
377    /// ```
378    pub fn set_label(&mut self, label: Option<String>) -> Result<(), Error> {
379        self.config.label = label;
380        Ok(())
381    }
382
383    /// The function returns the currect label assigned to the pipeline
384    /// ```
385    /// # use fiddler::Runtime;
386    /// # let conf_str = r#"input:
387    /// #   stdin: {}
388    /// # processors:
389    /// #  - label: my_cool_mapping
390    /// #    noop: {}
391    /// # output:
392    /// #   stdout: {}"#;
393    /// # tokio_test::block_on(async {
394    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
395    /// # env.set_label(Some("MyPipeline".into())).unwrap();
396    /// assert_eq!(env.get_label().unwrap(), "MyPipeline".to_string());
397    /// # });
398    /// ```
399    pub fn get_label(&self) -> Option<String> {
400        self.config.label.clone()
401    }
402
403    /// The function replaces the existing input configuration with the provided input.
404    /// ```
405    /// # use fiddler::config::{ConfigSpec, ItemType, ExecutionType};
406    /// # use std::collections::HashMap;
407    /// # use fiddler::Runtime;
408    /// # let conf_str = r#"input:
409    /// #   stdin: {}
410    /// # processors:
411    /// #  - label: my_cool_mapping
412    /// #    noop: {}
413    /// # output:
414    /// #   stdout: {}"#;
415    /// # tokio_test::block_on(async {
416    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
417    /// use serde_yaml::Value;
418    /// let conf_str = r#"file:
419    ///    filename: tests/data/input.txt
420    ///    codec: ToEnd"#;
421    /// let parsed_input: HashMap<String, Value> = serde_yaml::from_str(conf_str).unwrap();
422    ///
423    /// env.set_input(&parsed_input).await.unwrap()
424    /// # })
425    /// ```
426    pub async fn set_input(&mut self, input: &HashMap<String, Value>) -> Result<(), Error> {
427        let parsed_item = parse_configuration_item(ItemType::Input, input).await?;
428        self.config.input = parsed_item;
429        Ok(())
430    }
431
432    /// The function replaces the existing output configuration with the provided output.
433    /// ```
434    /// # use fiddler::config::{ConfigSpec, ItemType, ExecutionType};
435    /// # use std::collections::HashMap;
436    /// # use fiddler::Runtime;
437    /// # let conf_str = r#"input:
438    /// #   stdin: {}
439    /// # processors:
440    /// #  - label: my_cool_mapping
441    /// #    noop: {}
442    /// # output:
443    /// #   stdout: {}"#;
444    /// # tokio_test::block_on(async {
445    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
446    ///
447    /// use serde_yaml::Value;
448    /// let conf_str = r#"stdout: {}"#;
449    /// let parsed_output: HashMap<String, Value> = serde_yaml::from_str(conf_str).unwrap();
450    ///
451    /// env.set_output(&parsed_output).await.unwrap()
452    /// # });
453    /// ```
454    pub async fn set_output(&mut self, output: &HashMap<String, Value>) -> Result<(), Error> {
455        let parsed_item = parse_configuration_item(ItemType::Output, output).await?;
456        self.config.output = parsed_item;
457        Ok(())
458    }
459
460    /// The function sets the number of instances of processors and outputs to create.
461    /// ```
462    /// # use fiddler::config::{ConfigSpec, ItemType, ExecutionType};
463    /// # use std::collections::HashMap;
464    /// # use fiddler::Runtime;
465    /// # let conf_str = r#"input:
466    /// #   stdin: {}
467    /// # processors:
468    /// #  - label: my_cool_mapping
469    /// #    noop: {}
470    /// # output:
471    /// #   stdout: {}"#;
472    /// # tokio_test::block_on(async {
473    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
474    /// env.set_threads(1).unwrap()
475    /// # });
476    /// ```
477    pub fn set_threads(&mut self, count: usize) -> Result<(), Error> {
478        self.config.num_threads = count;
479        Ok(())
480    }
481
482    /// The function sets the timeout, or duration to run the pipeline
483    /// ```
484    /// # use fiddler::config::{ConfigSpec, ItemType, ExecutionType};
485    /// # use std::collections::HashMap;
486    /// # use std::time::Duration;
487    /// # use fiddler::Runtime;
488    /// # let conf_str = r#"input:
489    /// #   stdin: {}
490    /// # processors:
491    /// #  - label: my_cool_mapping
492    /// #    noop: {}
493    /// # output:
494    /// #   stdout: {}"#;
495    /// # tokio_test::block_on(async {
496    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
497    /// env.set_timeout(Some(Duration::from_secs(60)))
498    /// # });
499    /// ```
500    pub fn set_timeout(&mut self, timeout: Option<Duration>) -> Result<(), Error> {
501        self.timeout = timeout;
502        Ok(())
503    }
504    /// The function runs the existing data pipeline until receiving an Error::EndOfInput
505    /// ```no_run
506    /// # use fiddler::config::{ConfigSpec, ItemType, ExecutionType};
507    /// # use std::collections::HashMap;
508    /// # use fiddler::Runtime;
509    /// # let conf_str = r#"input:
510    /// #   stdin: {}
511    /// # processors:
512    /// #  - label: my_cool_mapping
513    /// #    noop: {}
514    /// # output:
515    /// #   stdout: {}"#;
516    /// # tokio_test::block_on(async {
517    /// # let mut env = Runtime::from_config(conf_str).await.unwrap();
518    /// env.run().await.unwrap();
519    /// # })
520    /// ```
521    pub async fn run(&self) -> Result<(), Error> {
522        let mut handles = JoinSet::new();
523
524        // Create metrics backend based on configuration
525        let metrics_backend = create_metrics(self.config.metrics.as_ref()).await?;
526
527        // Get metrics recording interval from config, or use default (300 seconds)
528        let metrics_interval = self
529            .config
530            .metrics
531            .as_ref()
532            .map(|m| m.interval)
533            .unwrap_or(300);
534
535        // Get collect_system_metrics flag from config, default to false
536        let collect_system_metrics = self
537            .config
538            .metrics
539            .as_ref()
540            .map(|m| m.collect_system_metrics)
541            .unwrap_or(false);
542
543        let (msg_tx, msg_rx) = bounded(CHANNEL_CAPACITY);
544        let msg_state = message_handler(
545            msg_rx,
546            self.state_rx.clone(),
547            self.config.num_threads,
548            metrics_backend,
549            metrics_interval,
550            collect_system_metrics,
551        );
552
553        spawn_task(&mut handles, msg_state);
554
555        let output = self
556            .output(self.config.output.clone(), &mut handles)
557            .await?;
558
559        let processors = self.pipeline(output, &mut handles).await?;
560
561        // Kill switch only needs capacity of 1 - it's a signal channel
562        let (ks_send, ks_recv) = bounded(1);
563
564        let input = input(self.config.input.clone(), processors, msg_tx, ks_recv);
565
566        spawn_task(&mut handles, input);
567
568        info!(label = self.config.label, "pipeline started");
569
570        if let Some(d) = self.timeout {
571            let timeout_ks_send = ks_send.clone();
572            handles.spawn(async move {
573                sleep(d).await;
574                trace!("sending kill signal");
575                if !timeout_ks_send.is_disconnected() {
576                    if let Err(e) = timeout_ks_send.send(()) {
577                        debug!(error = ?e, "Failed to send kill signal, receiver may have been dropped");
578                    }
579                }
580                Ok(())
581            });
582        }
583
584        // Main loop: wait for tasks to complete or Ctrl+C signal
585        loop {
586            tokio::select! {
587                // Handle task completion
588                res = handles.join_next() => {
589                    match res {
590                        Some(Ok(Ok(()))) => {
591                            // Task completed successfully, continue waiting for others
592                        }
593                        Some(Ok(Err(e))) => {
594                            // Task returned an error
595                            return Err(e);
596                        }
597                        Some(Err(e)) => {
598                            // Task panicked or was cancelled
599                            return Err(Error::ProcessingError(format!("{e}")));
600                        }
601                        None => {
602                            // All tasks completed
603                            break;
604                        }
605                    }
606                }
607                // Handle Ctrl+C signal for graceful shutdown
608                _ = tokio::signal::ctrl_c() => {
609                    info!("Received shutdown signal (Ctrl+C), initiating graceful shutdown");
610                    if !ks_send.is_disconnected() {
611                        if let Err(e) = ks_send.send(()) {
612                            debug!(error = ?e, "Failed to send kill signal from signal handler");
613                        }
614                    }
615                    // Continue the loop to let tasks shut down gracefully
616                }
617            }
618        }
619
620        info!("pipeline finished");
621        Ok(())
622    }
623
624    async fn pipeline(
625        &self,
626        input: Sender<InternalMessage>,
627        handles: &mut JoinSet<Result<(), Error>>,
628    ) -> Result<Sender<InternalMessage>, Error> {
629        trace!("starting pipeline");
630
631        let mut processors = self.config.processors.clone();
632        processors.reverse();
633
634        let mut next_tx = input;
635
636        for (i, v) in processors.iter().enumerate() {
637            let p = v.clone();
638
639            let (tx, rx) = bounded(CHANNEL_CAPACITY);
640
641            for n in 0..self.config.num_threads {
642                let proc = processors::run_processor(
643                    p.clone(),
644                    next_tx.clone(),
645                    rx.clone(),
646                    self.state_tx.clone(),
647                );
648                spawn_task(handles, proc);
649            }
650
651            next_tx = tx;
652        }
653
654        Ok(next_tx)
655    }
656
657    async fn output(
658        &self,
659        output: ParsedRegisteredItem,
660        handles: &mut JoinSet<Result<(), Error>>,
661    ) -> Result<Sender<InternalMessage>, Error> {
662        trace!("started output");
663
664        let (tx, rx) = bounded(CHANNEL_CAPACITY);
665
666        for i in 0..self.config.num_threads {
667            let item = (output.creator)(output.config.clone()).await?;
668            match item {
669                ExecutionType::Output(o) => {
670                    let state_tx = self.state_tx.clone();
671                    let new_rx = rx.clone();
672                    spawn_task(handles, outputs::run_output(new_rx, state_tx, o));
673                }
674                ExecutionType::OutputBatch(o) => {
675                    let state_tx = self.state_tx.clone();
676                    let new_rx = rx.clone();
677                    spawn_task(handles, outputs::run_output_batch(new_rx, state_tx, o));
678                }
679                _ => {
680                    error!("invalid execution type for output");
681                    return Err(Error::Validation("invalid execution type".into()));
682                }
683            };
684        }
685
686        Ok(tx)
687    }
688}
689
690struct State {
691    instance_count: i64,
692    processed_count: i64,
693    processed_error_count: i64,
694    output_count: i64,
695    output_error_count: i64,
696    filtered_count: i64,
697    closure: Option<CallbackChan>,
698    errors: Vec<String>,
699    stream_id: Option<String>,
700    stream_closed: Option<bool>,
701    /// Timestamp when this state was created, used for stale entry cleanup
702    created_at: Instant,
703}
704
705/// Process state updates iteratively to avoid stack overflow with deeply nested streams
706fn process_state(
707    handles: &mut FxHashMap<String, State>,
708    output_ct: &usize,
709    closed_outputs: &mut usize,
710    initial_msg: InternalMessageState,
711    metrics: &mut MessageMetrics,
712) -> Result<(), Error> {
713    // Use a stack for iterative processing instead of recursion
714    // Pre-allocate with realistic capacity to avoid reallocation
715    let mut pending_messages = Vec::with_capacity(4);
716    pending_messages.push(initial_msg);
717    let mut entries_to_remove = Vec::with_capacity(2);
718
719    while let Some(msg) = pending_messages.pop() {
720        let mut remove_entry = false;
721        let mut stream_id = None;
722        let mut message_completed_successfully = false;
723        let mut message_latency: Option<std::time::Duration> = None;
724
725        match handles.get_mut(&msg.message_id) {
726            None => {
727                if let MessageStatus::Shutdown = &msg.status {
728                    *closed_outputs += 1;
729                    if closed_outputs == output_ct {
730                        info!("exiting message handler");
731                        return Err(Error::EndOfInput);
732                    }
733                } else {
734                    return Err(Error::ExecutionError(format!(
735                        "Message ID {} does not exist",
736                        msg.message_id
737                    )));
738                };
739            }
740            Some(state) => {
741                match &msg.status {
742                    MessageStatus::New => {
743                        state.instance_count += 1;
744                        stream_id = state.stream_id.clone();
745                    }
746                    MessageStatus::Processed => {
747                        state.processed_count += 1;
748                        stream_id = state.stream_id.clone();
749                    }
750                    MessageStatus::ProcessError(e) => {
751                        state.processed_error_count += 1;
752                        state.errors.push(e.clone());
753                        stream_id = state.stream_id.clone();
754                        metrics.total_process_errors += 1;
755
756                        let stream_closed = state.stream_closed.unwrap_or(true);
757
758                        if stream_closed
759                            && (state.output_count
760                                + state.output_error_count
761                                + state.processed_error_count)
762                                >= state.instance_count
763                        {
764                            remove_entry = true;
765                            message_latency = Some(state.created_at.elapsed());
766                            if let Some(chan) = state.closure.take() {
767                                info!(message_id = msg.message_id, "calling closure");
768                                let err = std::mem::take(&mut state.errors);
769                                let _ = chan.send(Status::Errored(err));
770                            }
771                        }
772                    }
773                    MessageStatus::Output => {
774                        state.output_count += 1;
775                        stream_id = state.stream_id.clone();
776                        // Track output bytes
777                        metrics.output_bytes += msg.bytes;
778
779                        debug!(
780                            message_id = msg.message_id,
781                            errors = state.processed_error_count,
782                            "message fully processed"
783                        );
784                        let stream_closed = state.stream_closed.unwrap_or(true);
785
786                        if stream_closed && state.output_count >= state.instance_count {
787                            remove_entry = true;
788                            message_completed_successfully = true;
789                            message_latency = Some(state.created_at.elapsed());
790                            if let Some(chan) = state.closure.take() {
791                                info!(message_id = msg.message_id, "calling closure");
792                                let _ = chan.send(Status::Processed);
793                            }
794                        } else if stream_closed
795                            && (state.output_count
796                                + state.output_error_count
797                                + state.processed_error_count)
798                                >= state.instance_count
799                        {
800                            remove_entry = true;
801                            message_latency = Some(state.created_at.elapsed());
802                            if let Some(chan) = state.closure.take() {
803                                info!(message_id = msg.message_id, "calling closure");
804                                let err = std::mem::take(&mut state.errors);
805                                let _ = chan.send(Status::Errored(err));
806                            }
807                        }
808                    }
809                    MessageStatus::OutputError(e) => {
810                        state.output_error_count += 1;
811                        state.errors.push(e.clone());
812                        stream_id = state.stream_id.clone();
813                        metrics.total_output_errors += 1;
814
815                        if (state.output_count
816                            + state.output_error_count
817                            + state.processed_error_count)
818                            >= state.instance_count
819                        {
820                            remove_entry = state.stream_closed.unwrap_or(true);
821
822                            if remove_entry {
823                                message_latency = Some(state.created_at.elapsed());
824                                if let Some(chan) = state.closure.take() {
825                                    info!(message_id = msg.message_id, "calling closure");
826                                    let err = std::mem::take(&mut state.errors);
827                                    let _ = chan.send(Status::Errored(err));
828                                }
829                            }
830                        }
831                    }
832                    MessageStatus::Filtered => {
833                        // Filtered is treated as successful - message intentionally dropped
834                        state.filtered_count += 1;
835                        stream_id = state.stream_id.clone();
836                        metrics.total_filtered += 1;
837
838                        debug!(
839                            message_id = msg.message_id,
840                            "message filtered/dropped by processor"
841                        );
842
843                        let stream_closed = state.stream_closed.unwrap_or(true);
844
845                        // A filtered message counts as complete - decrement instance count
846                        // since no output will be sent for this message
847                        if stream_closed {
848                            // Check if all instances are accounted for (filtered + output + errors)
849                            if (state.filtered_count
850                                + state.output_count
851                                + state.output_error_count
852                                + state.processed_error_count)
853                                >= state.instance_count
854                            {
855                                remove_entry = true;
856                                message_completed_successfully = true;
857                                message_latency = Some(state.created_at.elapsed());
858                                if let Some(chan) = state.closure.take() {
859                                    info!(
860                                        message_id = msg.message_id,
861                                        "message filtered - calling closure"
862                                    );
863                                    let _ = chan.send(Status::Processed);
864                                }
865                            }
866                        }
867                    }
868                    MessageStatus::Shutdown => {
869                        *closed_outputs += 1;
870                        if closed_outputs == output_ct {
871                            debug!("exiting message handler");
872                            return Err(Error::EndOfInput);
873                        }
874                    }
875                    MessageStatus::StreamComplete => {
876                        state.stream_closed = Some(true);
877                        state.output_count += 1;
878
879                        stream_id = state.stream_id.clone();
880                        if state.output_count >= state.instance_count {
881                            remove_entry = true;
882                            message_completed_successfully = true;
883                            message_latency = Some(state.created_at.elapsed());
884                            if let Some(chan) = state.closure.take() {
885                                info!(message_id = msg.message_id, "calling closure");
886                                let _ = chan.send(Status::Processed);
887                            }
888                        } else if (state.output_count
889                            + state.output_error_count
890                            + state.processed_error_count)
891                            >= state.instance_count
892                        {
893                            remove_entry = true;
894                            message_latency = Some(state.created_at.elapsed());
895                            if let Some(chan) = state.closure.take() {
896                                info!(message_id = msg.message_id, "calling closure");
897                                let err = std::mem::take(&mut state.errors);
898                                let _ = chan.send(Status::Errored(err));
899                            }
900                        };
901                    }
902                };
903
904                trace!(
905                    instance_count = state.instance_count,
906                    processed_count = state.processed_count,
907                    processed_error_count = state.processed_error_count,
908                    output_count = state.output_count,
909                    output_error_count = state.output_error_count,
910                    stream_id = state.stream_id,
911                    stream_closed = state.stream_closed,
912                    state = msg.status.to_string(),
913                    message_id = msg.message_id,
914                    "Received message state"
915                );
916            }
917        };
918
919        // Queue stream state update instead of recursive call
920        if let Some(sid) = stream_id {
921            match handles.get(&sid) {
922                None => {
923                    return Err(Error::ExecutionError(format!(
924                        "StreamID {} does not exist (Message ID {})",
925                        sid, msg.message_id
926                    )))
927                }
928                Some(s) => {
929                    pending_messages.push(InternalMessageState {
930                        message_id: sid,
931                        status: msg.status.clone(),
932                        stream_id: s.stream_id.clone(),
933                        is_stream: true,
934                        bytes: msg.bytes,
935                    });
936                }
937            }
938        }
939
940        if remove_entry {
941            trace!(
942                message_id = msg.message_id,
943                "Marking message for removal from state"
944            );
945            entries_to_remove.push(msg.message_id);
946            // Track completed messages and latency (only count non-stream messages to avoid double counting)
947            if !msg.is_stream {
948                if message_completed_successfully {
949                    metrics.total_completed += 1;
950                }
951                // Record latency for all completed messages (success or failure)
952                if let Some(latency) = message_latency {
953                    metrics.record_latency(latency);
954                }
955            }
956        }
957    }
958
959    // Remove entries after processing to avoid borrow conflicts
960    for message_id in entries_to_remove {
961        let _ = handles.remove(&message_id);
962    }
963
964    Ok(())
965}
966
967async fn message_handler(
968    new_msg: Receiver<MessageHandle>,
969    msg_status: Receiver<InternalMessageState>,
970    output_ct: usize,
971    mut metrics_backend: Box<dyn Metrics>,
972    metrics_interval_secs: u64,
973    collect_system_metrics: bool,
974) -> Result<(), Error> {
975    // Pre-allocate FxHashMap for expected concurrent messages (faster than SipHash)
976    let mut handles: FxHashMap<String, State> = FxHashMap::default();
977    handles.reserve(1024);
978    let mut closed_outputs = 0;
979    let stale_timeout = Duration::from_secs(STALE_MESSAGE_TIMEOUT_SECS);
980    let mut metrics = MessageMetrics::new();
981
982    // Create System instance for collecting CPU/memory metrics if enabled
983    let mut system = if collect_system_metrics {
984        let mut sys = System::new();
985        // Initial refresh to populate baseline data
986        sys.refresh_cpu_usage();
987        sys.refresh_memory();
988        Some(sys)
989    } else {
990        None
991    };
992
993    // Use tokio interval timers instead of manual elapsed() checks
994    let mut metrics_timer = interval(Duration::from_secs(metrics_interval_secs));
995    metrics_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
996    // Skip the first immediate tick
997    metrics_timer.tick().await;
998
999    let mut cleanup_timer = interval(Duration::from_secs(STALE_CLEANUP_INTERVAL_SECS));
1000    cleanup_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
1001    // Skip the first immediate tick
1002    cleanup_timer.tick().await;
1003
1004    debug!(
1005        interval_secs = metrics_interval_secs,
1006        collect_system_metrics = collect_system_metrics,
1007        "Metrics recording interval configured"
1008    );
1009
1010    loop {
1011        tokio::select! {
1012            // biased ensures new messages are registered before status updates are processed
1013            biased;
1014            Ok(msg) = new_msg.recv_async() => {
1015                trace!(message_id = msg.message_id, "Received new message");
1016                if msg.is_stream && msg.stream_complete {
1017                    metrics.streams_completed += 1;
1018                    if let Err(e) = process_state(&mut handles, &output_ct, &mut closed_outputs, InternalMessageState {
1019                        message_id: msg.message_id.clone(),
1020                        status: MessageStatus::StreamComplete,
1021                        stream_id: msg.stream_id.clone(),
1022                        is_stream: true,
1023                        bytes: 0,
1024                    }, &mut metrics) {
1025                        match e {
1026                            Error::EndOfInput => {
1027                                log_shutdown_metrics(&metrics, handles.len(), metrics_backend.as_mut(), system.as_mut());
1028                                return Ok(());
1029                            }
1030                            _ => return Err(e),
1031                        }
1032                    }
1033                    continue
1034                };
1035
1036                // Track new message received and input bytes
1037                metrics.total_received += 1;
1038                metrics.input_bytes += msg.input_bytes;
1039                if msg.is_stream {
1040                    metrics.streams_started += 1;
1041                }
1042
1043                let closure = msg.closure;
1044                let stream_id = msg.stream_id;
1045                let is_stream = msg.is_stream;
1046                // Consume message_id instead of cloning - Entry API takes ownership
1047                match handles.entry(msg.message_id) {
1048                    Entry::Vacant(entry) => {
1049                        entry.insert(State {
1050                            instance_count: 1,
1051                            processed_count: 0,
1052                            processed_error_count: 0,
1053                            output_count: 0,
1054                            output_error_count: 0,
1055                            filtered_count: 0,
1056                            closure,
1057                            errors: Vec::with_capacity(2), // Pre-allocate for common case
1058                            stream_id: stream_id.clone(),
1059                            stream_closed: if is_stream { Some(false) } else { None },
1060                            created_at: Instant::now(),
1061                        });
1062                    }
1063                    Entry::Occupied(entry) => {
1064                        metrics.duplicates_rejected += 1;
1065                        error!(message_id = entry.key(), "Received duplicate message");
1066                        return Err(Error::ExecutionError("Duplicate Message ID Error".into()));
1067                    }
1068                }
1069
1070                if let Some(s) = &stream_id {
1071                    if let Err(e) = process_state(&mut handles, &output_ct, &mut closed_outputs, InternalMessageState{
1072                        message_id: s.clone(),
1073                        status: MessageStatus::New,
1074                        stream_id: None,
1075                        is_stream: true,
1076                        bytes: 0,
1077                    }, &mut metrics) {
1078                        match e {
1079                            Error::EndOfInput => {
1080                                log_shutdown_metrics(&metrics, handles.len(), metrics_backend.as_mut(), system.as_mut());
1081                                return Ok(());
1082                            }
1083                        _ => return Err(e),
1084                        }
1085                    };
1086                }
1087            },
1088            Ok(msg) = msg_status.recv_async() => {
1089                if let Err(e) = process_state(&mut handles, &output_ct, &mut closed_outputs, msg, &mut metrics) {
1090                    match e {
1091                        Error::EndOfInput => {
1092                            log_shutdown_metrics(&metrics, handles.len(), metrics_backend.as_mut(), system.as_mut());
1093                            return Ok(());
1094                        }
1095                        _ => return Err(e),
1096                    };
1097                };
1098            },
1099            _ = metrics_timer.tick() => {
1100                metrics.record(metrics_backend.as_mut(), handles.len(), system.as_mut());
1101                trace!(
1102                    in_flight = handles.len(),
1103                    throughput = format!("{:.2}", metrics.throughput_per_sec()),
1104                    "Recorded periodic metrics"
1105                );
1106            },
1107            _ = cleanup_timer.tick() => {
1108                let before_count = handles.len();
1109                handles.retain(|message_id, state| {
1110                    let is_stale = state.created_at.elapsed() >= stale_timeout;
1111                    if is_stale {
1112                        warn!(
1113                            message_id = message_id,
1114                            age_secs = state.created_at.elapsed().as_secs(),
1115                            "Removing stale message state entry"
1116                        );
1117                    }
1118                    !is_stale
1119                });
1120                let removed = before_count - handles.len();
1121                if removed > 0 {
1122                    metrics.stale_entries_removed += removed as u64;
1123                    info!(
1124                        removed_count = removed,
1125                        remaining_count = handles.len(),
1126                        "Cleaned up stale message state entries"
1127                    );
1128                }
1129            },
1130            else => break,
1131        }
1132    }
1133
1134    log_shutdown_metrics(
1135        &metrics,
1136        handles.len(),
1137        metrics_backend.as_mut(),
1138        system.as_mut(),
1139    );
1140    Ok(())
1141}
1142
1143/// Logs comprehensive shutdown metrics for observability.
1144fn log_shutdown_metrics(
1145    metrics: &MessageMetrics,
1146    in_flight: usize,
1147    metrics_backend: &mut dyn Metrics,
1148    system: Option<&mut System>,
1149) {
1150    // Record final metrics to configured backend
1151    metrics.record(metrics_backend, in_flight, system);
1152
1153    info!(
1154        total_received = metrics.total_received,
1155        total_completed = metrics.total_completed,
1156        total_process_errors = metrics.total_process_errors,
1157        total_output_errors = metrics.total_output_errors,
1158        streams_started = metrics.streams_started,
1159        streams_completed = metrics.streams_completed,
1160        duplicates_rejected = metrics.duplicates_rejected,
1161        stale_entries_removed = metrics.stale_entries_removed,
1162        duration_secs = metrics.elapsed().as_secs(),
1163        throughput_per_sec = format!("{:.2}", metrics.throughput_per_sec()),
1164        error_rate_percent = format!("{:.2}", metrics.error_rate()),
1165        remaining_in_flight = in_flight,
1166        "Message handler shutdown complete"
1167    );
1168}
1169
1170async fn input(
1171    input: ParsedRegisteredItem,
1172    output: Sender<InternalMessage>,
1173    state_handle: Sender<MessageHandle>,
1174    kill_switch: Receiver<()>,
1175) -> Result<(), Error> {
1176    trace!("started input");
1177
1178    let item = (input.creator)(input.config.clone()).await?;
1179
1180    match item {
1181        ExecutionType::Input(i) => {
1182            crate::modules::inputs::run_input(i, output, state_handle, kill_switch).await
1183        }
1184        ExecutionType::InputBatch(i) => {
1185            crate::modules::inputs::run_input_batch(i, output, state_handle, kill_switch).await
1186        }
1187        _ => {
1188            error!("invalid execution type for input");
1189            Err(Error::Validation("invalid execution type".into()))
1190        }
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use super::*;
1197
1198    #[test]
1199    fn test_message_metrics_new() {
1200        let metrics = MessageMetrics::new();
1201        assert_eq!(metrics.total_received, 0);
1202        assert_eq!(metrics.total_completed, 0);
1203        assert_eq!(metrics.total_process_errors, 0);
1204        assert_eq!(metrics.total_output_errors, 0);
1205        assert_eq!(metrics.streams_started, 0);
1206        assert_eq!(metrics.streams_completed, 0);
1207        assert_eq!(metrics.duplicates_rejected, 0);
1208        assert_eq!(metrics.stale_entries_removed, 0);
1209        assert!(metrics.started_at.is_some());
1210    }
1211
1212    #[test]
1213    fn test_message_metrics_default() {
1214        let metrics = MessageMetrics::default();
1215        assert_eq!(metrics.total_received, 0);
1216        assert!(metrics.started_at.is_none());
1217    }
1218
1219    #[test]
1220    fn test_message_metrics_elapsed() {
1221        let metrics = MessageMetrics::new();
1222        std::thread::sleep(std::time::Duration::from_millis(10));
1223        let elapsed = metrics.elapsed();
1224        assert!(elapsed.as_millis() >= 10);
1225    }
1226
1227    #[test]
1228    fn test_message_metrics_elapsed_without_start() {
1229        let metrics = MessageMetrics::default();
1230        let elapsed = metrics.elapsed();
1231        assert_eq!(elapsed, std::time::Duration::default());
1232    }
1233
1234    #[test]
1235    fn test_message_metrics_throughput_per_sec() {
1236        let mut metrics = MessageMetrics::new();
1237        metrics.total_completed = 100;
1238        // Throughput depends on elapsed time, just verify it returns a value
1239        let throughput = metrics.throughput_per_sec();
1240        assert!(throughput >= 0.0);
1241    }
1242
1243    #[test]
1244    fn test_message_metrics_throughput_zero_elapsed() {
1245        let mut metrics = MessageMetrics::default();
1246        metrics.total_completed = 100;
1247        // With no start time, elapsed is 0, so throughput should be 0
1248        let throughput = metrics.throughput_per_sec();
1249        assert_eq!(throughput, 0.0);
1250    }
1251
1252    #[test]
1253    fn test_message_metrics_error_rate_no_messages() {
1254        let metrics = MessageMetrics::new();
1255        let error_rate = metrics.error_rate();
1256        assert_eq!(error_rate, 0.0);
1257    }
1258
1259    #[test]
1260    fn test_message_metrics_error_rate_with_errors() {
1261        let mut metrics = MessageMetrics::new();
1262        metrics.total_completed = 90;
1263        metrics.total_process_errors = 5;
1264        metrics.total_output_errors = 5;
1265        let error_rate = metrics.error_rate();
1266        // 10 errors out of 100 total = 10%
1267        assert!((error_rate - 10.0).abs() < 0.01);
1268    }
1269
1270    #[test]
1271    fn test_message_metrics_error_rate_all_errors() {
1272        let mut metrics = MessageMetrics::new();
1273        metrics.total_completed = 0;
1274        metrics.total_process_errors = 50;
1275        metrics.total_output_errors = 50;
1276        let error_rate = metrics.error_rate();
1277        // 100 errors out of 100 total = 100%
1278        assert!((error_rate - 100.0).abs() < 0.01);
1279    }
1280
1281    #[test]
1282    fn test_message_metrics_record_with_noop_backend() {
1283        use crate::modules::metrics::NoOpMetrics;
1284        // Should work with no-op metrics backend
1285        let metrics = MessageMetrics::new();
1286        let mut backend = NoOpMetrics::new();
1287        metrics.record(&mut backend, 10, None);
1288        // No assertion needed - just verify it doesn't panic
1289    }
1290
1291    #[test]
1292    fn test_process_state_unknown_message_id() {
1293        let mut handles = FxHashMap::default();
1294        let mut closed_outputs = 0;
1295        let mut metrics = MessageMetrics::new();
1296
1297        // Try to process a status for a non-existent message
1298        let result = process_state(
1299            &mut handles,
1300            &1,
1301            &mut closed_outputs,
1302            InternalMessageState {
1303                message_id: "unknown_id".to_string(),
1304                status: MessageStatus::Processed,
1305                stream_id: None,
1306                is_stream: false,
1307                bytes: 0,
1308            },
1309            &mut metrics,
1310        );
1311
1312        // Should return an error for unknown message ID
1313        assert!(result.is_err());
1314        match result {
1315            Err(Error::ExecutionError(msg)) => {
1316                assert!(msg.contains("does not exist"));
1317            }
1318            _ => panic!("Expected ExecutionError"),
1319        }
1320    }
1321
1322    #[test]
1323    fn test_process_state_shutdown_signal() {
1324        let mut handles = FxHashMap::default();
1325        let mut closed_outputs = 0;
1326        let mut metrics = MessageMetrics::new();
1327        let output_ct = 1; // Single output
1328
1329        // Process shutdown signal
1330        let result = process_state(
1331            &mut handles,
1332            &output_ct,
1333            &mut closed_outputs,
1334            InternalMessageState {
1335                message_id: crate::SHUTDOWN_MESSAGE_ID.to_string(),
1336                status: MessageStatus::Shutdown,
1337                stream_id: None,
1338                is_stream: false,
1339                bytes: 0,
1340            },
1341            &mut metrics,
1342        );
1343
1344        // Should return EndOfInput when all outputs have shut down
1345        assert!(result.is_err());
1346        assert!(matches!(result, Err(Error::EndOfInput)));
1347        assert_eq!(closed_outputs, 1);
1348    }
1349
1350    #[test]
1351    fn test_process_state_tracks_errors() {
1352        let mut handles = FxHashMap::default();
1353        let mut closed_outputs = 0;
1354        let mut metrics = MessageMetrics::new();
1355        let message_id = "test_msg".to_string();
1356
1357        // Add a message to handles
1358        handles.insert(
1359            message_id.clone(),
1360            State {
1361                instance_count: 1,
1362                processed_count: 0,
1363                processed_error_count: 0,
1364                output_count: 0,
1365                output_error_count: 0,
1366                filtered_count: 0,
1367                closure: None,
1368                errors: Vec::new(),
1369                stream_id: None,
1370                stream_closed: Some(true),
1371                created_at: std::time::Instant::now(),
1372            },
1373        );
1374
1375        // Process an error status
1376        let result = process_state(
1377            &mut handles,
1378            &1,
1379            &mut closed_outputs,
1380            InternalMessageState {
1381                message_id: message_id.clone(),
1382                status: MessageStatus::ProcessError("test error".to_string()),
1383                stream_id: None,
1384                is_stream: false,
1385                bytes: 0,
1386            },
1387            &mut metrics,
1388        );
1389
1390        assert!(result.is_ok());
1391        assert_eq!(metrics.total_process_errors, 1);
1392        // Message should be removed since stream_closed is true and counts match
1393        assert!(!handles.contains_key(&message_id));
1394    }
1395
1396    #[test]
1397    fn test_message_status_display() {
1398        assert_eq!(format!("{}", MessageStatus::New), "New");
1399        assert_eq!(format!("{}", MessageStatus::Processed), "Processed");
1400        assert_eq!(
1401            format!("{}", MessageStatus::ProcessError("err".into())),
1402            "ProcessError"
1403        );
1404        assert_eq!(format!("{}", MessageStatus::Output), "Output");
1405        assert_eq!(
1406            format!("{}", MessageStatus::OutputError("err".into())),
1407            "OutputError"
1408        );
1409        assert_eq!(format!("{}", MessageStatus::Shutdown), "Shutdown");
1410        assert_eq!(
1411            format!("{}", MessageStatus::StreamComplete),
1412            "StreamComplete"
1413        );
1414    }
1415}