1use flume::{bounded, Receiver, Sender};
2use rustc_hash::FxHashMap;
3use serde::Deserialize;
4use serde::Serialize;
5use serde_yaml::Value;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fmt;
9use std::future::Future;
10use std::time::Instant;
11use sysinfo::System;
12use tokio::task::JoinSet;
13use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::MetricEntry;
17use std::str::FromStr;
18use std::sync::Once;
19
20const STALE_MESSAGE_TIMEOUT_SECS: u64 = 3600;
22
23const STALE_CLEANUP_INTERVAL_SECS: u64 = 300;
25
26const CHANNEL_CAPACITY: usize = 10_000;
29
30#[derive(Debug, Default)]
35pub struct MessageMetrics {
36 pub total_received: u64,
38 pub total_completed: u64,
40 pub total_process_errors: u64,
42 pub total_output_errors: u64,
44 pub total_filtered: u64,
46 pub streams_started: u64,
48 pub streams_completed: u64,
50 pub duplicates_rejected: u64,
52 pub stale_entries_removed: u64,
54 pub input_bytes: u64,
56 pub output_bytes: u64,
58 started_at: Option<Instant>,
60 latency_sum_us: u128,
62 latency_count: u64,
64 latency_min_us: Option<u64>,
66 latency_max_us: u64,
68}
69
70impl MessageMetrics {
71 pub fn new() -> Self {
73 Self {
74 started_at: Some(Instant::now()),
75 ..Default::default()
76 }
77 }
78
79 pub fn elapsed(&self) -> std::time::Duration {
81 self.started_at.map(|t| t.elapsed()).unwrap_or_default()
82 }
83
84 pub fn throughput_per_sec(&self) -> f64 {
86 let elapsed = self.elapsed().as_secs_f64();
87 if elapsed > 0.0 {
88 self.total_completed as f64 / elapsed
89 } else {
90 0.0
91 }
92 }
93
94 pub fn bytes_per_sec(&self) -> f64 {
96 let elapsed = self.elapsed().as_secs_f64();
97 if elapsed > 0.0 {
98 self.output_bytes as f64 / elapsed
99 } else {
100 0.0
101 }
102 }
103
104 pub fn error_rate(&self) -> f64 {
106 let total = self.total_completed + self.total_process_errors + self.total_output_errors;
107 if total > 0 {
108 ((self.total_process_errors + self.total_output_errors) as f64 / total as f64) * 100.0
109 } else {
110 0.0
111 }
112 }
113
114 pub fn record_latency(&mut self, latency: std::time::Duration) {
120 let latency_us = latency.as_micros() as u64;
121 self.latency_sum_us += latency_us as u128;
122 self.latency_count += 1;
123
124 match self.latency_min_us {
126 Some(min) if latency_us < min => self.latency_min_us = Some(latency_us),
127 None => self.latency_min_us = Some(latency_us),
128 _ => {}
129 }
130
131 if latency_us > self.latency_max_us {
133 self.latency_max_us = latency_us;
134 }
135 }
136
137 pub fn latency_avg_ms(&self) -> f64 {
139 if self.latency_count > 0 {
140 (self.latency_sum_us as f64 / self.latency_count as f64) / 1000.0
141 } else {
142 0.0
143 }
144 }
145
146 pub fn latency_min_ms(&self) -> f64 {
148 self.latency_min_us
149 .map(|us| us as f64 / 1000.0)
150 .unwrap_or(0.0)
151 }
152
153 pub fn latency_max_ms(&self) -> f64 {
155 self.latency_max_us as f64 / 1000.0
156 }
157
158 pub fn record(
163 &self,
164 metrics_backend: &mut dyn Metrics,
165 in_flight: usize,
166 system: Option<&mut System>,
167 ) {
168 let (cpu_usage_percent, memory_used_bytes, memory_total_bytes) = if let Some(sys) = system {
170 sys.refresh_cpu_usage();
171 sys.refresh_memory();
172 let cpu = sys.global_cpu_usage();
173 let mem_used = sys.used_memory();
174 let mem_total = sys.total_memory();
175 (Some(cpu), Some(mem_used), Some(mem_total))
176 } else {
177 (None, None, None)
178 };
179
180 metrics_backend.record(MetricEntry {
181 total_received: self.total_received,
182 total_completed: self.total_completed,
183 total_process_errors: self.total_process_errors,
184 total_output_errors: self.total_output_errors,
185 total_filtered: self.total_filtered,
186 streams_started: self.streams_started,
187 streams_completed: self.streams_completed,
188 duplicates_rejected: self.duplicates_rejected,
189 stale_entries_removed: self.stale_entries_removed,
190 in_flight,
191 throughput_per_sec: self.throughput_per_sec(),
192 cpu_usage_percent,
193 memory_used_bytes,
194 memory_total_bytes,
195 input_bytes: self.input_bytes,
196 output_bytes: self.output_bytes,
197 bytes_per_sec: self.bytes_per_sec(),
198 latency_avg_ms: self.latency_avg_ms(),
199 latency_min_ms: self.latency_min_ms(),
200 latency_max_ms: self.latency_max_ms(),
201 });
202 }
203}
204
205fn spawn_task<F>(handles: &mut JoinSet<Result<(), Error>>, task: F)
208where
209 F: Future<Output = Result<(), Error>> + Send + 'static,
210{
211 handles.spawn(task);
212}
213
214use super::CallbackChan;
215use super::Error;
216use super::Message;
217use super::Metrics;
218use crate::config::parse_configuration_item;
219use crate::config::ExecutionType;
220use crate::config::{Config, ItemType, ParsedConfig, ParsedRegisteredItem};
221
222use crate::modules::metrics::create_metrics;
223use crate::modules::outputs;
224use crate::modules::processors;
225use crate::modules::register_plugins;
226use crate::Status;
227
228use once_cell::sync::Lazy;
229use std::sync::Mutex;
230
231static REGISTER: Once = Once::new();
232static REGISTER_ERROR: Lazy<Mutex<Option<String>>> = Lazy::new(|| Mutex::new(None));
234
235pub struct Runtime {
237 config: ParsedConfig,
238 state_tx: Sender<InternalMessageState>,
239 state_rx: Receiver<InternalMessageState>,
240 timeout: Option<Duration>,
241}
242
243#[derive(Clone, Serialize, Deserialize, Default, Debug)]
244pub(crate) enum MessageStatus {
245 #[default]
246 New,
247 Processed,
248 ProcessError(String),
249 Output,
250 OutputError(String),
251 Filtered,
252 Shutdown,
253 StreamComplete,
254}
255
256impl fmt::Display for MessageStatus {
257 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258 match *self {
259 MessageStatus::New => write!(f, "New"),
260 MessageStatus::Processed => write!(f, "Processed"),
261 MessageStatus::ProcessError(_) => write!(f, "ProcessError"),
262 MessageStatus::Output => write!(f, "Output"),
263 MessageStatus::OutputError(_) => write!(f, "OutputError"),
264 MessageStatus::Filtered => write!(f, "Filtered"),
265 MessageStatus::Shutdown => write!(f, "Shutdown"),
266 MessageStatus::StreamComplete => write!(f, "StreamComplete"),
267 }
268 }
269}
270
271#[derive(Clone, Serialize, Deserialize, Default)]
272pub(crate) struct InternalMessageState {
273 pub message_id: String,
274 pub status: MessageStatus,
275 pub stream_id: Option<String>,
276 pub is_stream: bool,
277 pub bytes: u64,
279}
280
281#[derive(Clone, Serialize, Deserialize)]
282pub(crate) struct InternalMessage {
283 pub message: Message,
284 pub message_id: String,
285 pub status: MessageStatus,
286}
287
288pub(crate) struct MessageHandle {
289 pub message_id: String,
290 pub closure: Option<CallbackChan>,
291 pub stream_id: Option<String>,
292 pub is_stream: bool,
293 pub stream_complete: bool,
294 pub input_bytes: u64,
296}
297
298impl Runtime {
299 pub async fn from_config(config: &str) -> Result<Self, Error> {
316 REGISTER.call_once(|| {
317 if let Err(e) = register_plugins() {
318 if let Ok(mut err) = REGISTER_ERROR.lock() {
319 *err = Some(format!("{e}"));
320 }
321 }
322 });
323
324 if let Ok(err_lock) = REGISTER_ERROR.lock() {
326 if let Some(ref e) = *err_lock {
327 return Err(Error::ExecutionError(format!(
328 "Plugin registration failed: {e}"
329 )));
330 }
331 }
332 trace!("plugins registered");
333
334 let conf: Config = Config::from_str(config)?;
335 let parsed_conf = conf.validate().await?;
336
337 let (state_tx, state_rx) = bounded(CHANNEL_CAPACITY);
338
339 debug!("Runtime is ready");
340 Ok(Runtime {
341 config: parsed_conf,
342 state_rx,
343 state_tx,
344 timeout: None,
345 })
346 }
347
348 pub fn set_label(&mut self, label: Option<String>) -> Result<(), Error> {
379 self.config.label = label;
380 Ok(())
381 }
382
383 pub fn get_label(&self) -> Option<String> {
400 self.config.label.clone()
401 }
402
403 pub async fn set_input(&mut self, input: &HashMap<String, Value>) -> Result<(), Error> {
427 let parsed_item = parse_configuration_item(ItemType::Input, input).await?;
428 self.config.input = parsed_item;
429 Ok(())
430 }
431
432 pub async fn set_output(&mut self, output: &HashMap<String, Value>) -> Result<(), Error> {
455 let parsed_item = parse_configuration_item(ItemType::Output, output).await?;
456 self.config.output = parsed_item;
457 Ok(())
458 }
459
460 pub fn set_threads(&mut self, count: usize) -> Result<(), Error> {
478 self.config.num_threads = count;
479 Ok(())
480 }
481
482 pub fn set_timeout(&mut self, timeout: Option<Duration>) -> Result<(), Error> {
501 self.timeout = timeout;
502 Ok(())
503 }
504 pub async fn run(&self) -> Result<(), Error> {
522 let mut handles = JoinSet::new();
523
524 let metrics_backend = create_metrics(self.config.metrics.as_ref()).await?;
526
527 let metrics_interval = self
529 .config
530 .metrics
531 .as_ref()
532 .map(|m| m.interval)
533 .unwrap_or(300);
534
535 let collect_system_metrics = self
537 .config
538 .metrics
539 .as_ref()
540 .map(|m| m.collect_system_metrics)
541 .unwrap_or(false);
542
543 let (msg_tx, msg_rx) = bounded(CHANNEL_CAPACITY);
544 let msg_state = message_handler(
545 msg_rx,
546 self.state_rx.clone(),
547 self.config.num_threads,
548 metrics_backend,
549 metrics_interval,
550 collect_system_metrics,
551 );
552
553 spawn_task(&mut handles, msg_state);
554
555 let output = self
556 .output(self.config.output.clone(), &mut handles)
557 .await?;
558
559 let processors = self.pipeline(output, &mut handles).await?;
560
561 let (ks_send, ks_recv) = bounded(1);
563
564 let input = input(self.config.input.clone(), processors, msg_tx, ks_recv);
565
566 spawn_task(&mut handles, input);
567
568 info!(label = self.config.label, "pipeline started");
569
570 if let Some(d) = self.timeout {
571 let timeout_ks_send = ks_send.clone();
572 handles.spawn(async move {
573 sleep(d).await;
574 trace!("sending kill signal");
575 if !timeout_ks_send.is_disconnected() {
576 if let Err(e) = timeout_ks_send.send(()) {
577 debug!(error = ?e, "Failed to send kill signal, receiver may have been dropped");
578 }
579 }
580 Ok(())
581 });
582 }
583
584 loop {
586 tokio::select! {
587 res = handles.join_next() => {
589 match res {
590 Some(Ok(Ok(()))) => {
591 }
593 Some(Ok(Err(e))) => {
594 return Err(e);
596 }
597 Some(Err(e)) => {
598 return Err(Error::ProcessingError(format!("{e}")));
600 }
601 None => {
602 break;
604 }
605 }
606 }
607 _ = tokio::signal::ctrl_c() => {
609 info!("Received shutdown signal (Ctrl+C), initiating graceful shutdown");
610 if !ks_send.is_disconnected() {
611 if let Err(e) = ks_send.send(()) {
612 debug!(error = ?e, "Failed to send kill signal from signal handler");
613 }
614 }
615 }
617 }
618 }
619
620 info!("pipeline finished");
621 Ok(())
622 }
623
624 async fn pipeline(
625 &self,
626 input: Sender<InternalMessage>,
627 handles: &mut JoinSet<Result<(), Error>>,
628 ) -> Result<Sender<InternalMessage>, Error> {
629 trace!("starting pipeline");
630
631 let mut processors = self.config.processors.clone();
632 processors.reverse();
633
634 let mut next_tx = input;
635
636 for (i, v) in processors.iter().enumerate() {
637 let p = v.clone();
638
639 let (tx, rx) = bounded(CHANNEL_CAPACITY);
640
641 for n in 0..self.config.num_threads {
642 let proc = processors::run_processor(
643 p.clone(),
644 next_tx.clone(),
645 rx.clone(),
646 self.state_tx.clone(),
647 );
648 spawn_task(handles, proc);
649 }
650
651 next_tx = tx;
652 }
653
654 Ok(next_tx)
655 }
656
657 async fn output(
658 &self,
659 output: ParsedRegisteredItem,
660 handles: &mut JoinSet<Result<(), Error>>,
661 ) -> Result<Sender<InternalMessage>, Error> {
662 trace!("started output");
663
664 let (tx, rx) = bounded(CHANNEL_CAPACITY);
665
666 for i in 0..self.config.num_threads {
667 let item = (output.creator)(output.config.clone()).await?;
668 match item {
669 ExecutionType::Output(o) => {
670 let state_tx = self.state_tx.clone();
671 let new_rx = rx.clone();
672 spawn_task(handles, outputs::run_output(new_rx, state_tx, o));
673 }
674 ExecutionType::OutputBatch(o) => {
675 let state_tx = self.state_tx.clone();
676 let new_rx = rx.clone();
677 spawn_task(handles, outputs::run_output_batch(new_rx, state_tx, o));
678 }
679 _ => {
680 error!("invalid execution type for output");
681 return Err(Error::Validation("invalid execution type".into()));
682 }
683 };
684 }
685
686 Ok(tx)
687 }
688}
689
690struct State {
691 instance_count: i64,
692 processed_count: i64,
693 processed_error_count: i64,
694 output_count: i64,
695 output_error_count: i64,
696 filtered_count: i64,
697 closure: Option<CallbackChan>,
698 errors: Vec<String>,
699 stream_id: Option<String>,
700 stream_closed: Option<bool>,
701 created_at: Instant,
703}
704
705fn process_state(
707 handles: &mut FxHashMap<String, State>,
708 output_ct: &usize,
709 closed_outputs: &mut usize,
710 initial_msg: InternalMessageState,
711 metrics: &mut MessageMetrics,
712) -> Result<(), Error> {
713 let mut pending_messages = Vec::with_capacity(4);
716 pending_messages.push(initial_msg);
717 let mut entries_to_remove = Vec::with_capacity(2);
718
719 while let Some(msg) = pending_messages.pop() {
720 let mut remove_entry = false;
721 let mut stream_id = None;
722 let mut message_completed_successfully = false;
723 let mut message_latency: Option<std::time::Duration> = None;
724
725 match handles.get_mut(&msg.message_id) {
726 None => {
727 if let MessageStatus::Shutdown = &msg.status {
728 *closed_outputs += 1;
729 if closed_outputs == output_ct {
730 info!("exiting message handler");
731 return Err(Error::EndOfInput);
732 }
733 } else {
734 return Err(Error::ExecutionError(format!(
735 "Message ID {} does not exist",
736 msg.message_id
737 )));
738 };
739 }
740 Some(state) => {
741 match &msg.status {
742 MessageStatus::New => {
743 state.instance_count += 1;
744 stream_id = state.stream_id.clone();
745 }
746 MessageStatus::Processed => {
747 state.processed_count += 1;
748 stream_id = state.stream_id.clone();
749 }
750 MessageStatus::ProcessError(e) => {
751 state.processed_error_count += 1;
752 state.errors.push(e.clone());
753 stream_id = state.stream_id.clone();
754 metrics.total_process_errors += 1;
755
756 let stream_closed = state.stream_closed.unwrap_or(true);
757
758 if stream_closed
759 && (state.output_count
760 + state.output_error_count
761 + state.processed_error_count)
762 >= state.instance_count
763 {
764 remove_entry = true;
765 message_latency = Some(state.created_at.elapsed());
766 if let Some(chan) = state.closure.take() {
767 info!(message_id = msg.message_id, "calling closure");
768 let err = std::mem::take(&mut state.errors);
769 let _ = chan.send(Status::Errored(err));
770 }
771 }
772 }
773 MessageStatus::Output => {
774 state.output_count += 1;
775 stream_id = state.stream_id.clone();
776 metrics.output_bytes += msg.bytes;
778
779 debug!(
780 message_id = msg.message_id,
781 errors = state.processed_error_count,
782 "message fully processed"
783 );
784 let stream_closed = state.stream_closed.unwrap_or(true);
785
786 if stream_closed && state.output_count >= state.instance_count {
787 remove_entry = true;
788 message_completed_successfully = true;
789 message_latency = Some(state.created_at.elapsed());
790 if let Some(chan) = state.closure.take() {
791 info!(message_id = msg.message_id, "calling closure");
792 let _ = chan.send(Status::Processed);
793 }
794 } else if stream_closed
795 && (state.output_count
796 + state.output_error_count
797 + state.processed_error_count)
798 >= state.instance_count
799 {
800 remove_entry = true;
801 message_latency = Some(state.created_at.elapsed());
802 if let Some(chan) = state.closure.take() {
803 info!(message_id = msg.message_id, "calling closure");
804 let err = std::mem::take(&mut state.errors);
805 let _ = chan.send(Status::Errored(err));
806 }
807 }
808 }
809 MessageStatus::OutputError(e) => {
810 state.output_error_count += 1;
811 state.errors.push(e.clone());
812 stream_id = state.stream_id.clone();
813 metrics.total_output_errors += 1;
814
815 if (state.output_count
816 + state.output_error_count
817 + state.processed_error_count)
818 >= state.instance_count
819 {
820 remove_entry = state.stream_closed.unwrap_or(true);
821
822 if remove_entry {
823 message_latency = Some(state.created_at.elapsed());
824 if let Some(chan) = state.closure.take() {
825 info!(message_id = msg.message_id, "calling closure");
826 let err = std::mem::take(&mut state.errors);
827 let _ = chan.send(Status::Errored(err));
828 }
829 }
830 }
831 }
832 MessageStatus::Filtered => {
833 state.filtered_count += 1;
835 stream_id = state.stream_id.clone();
836 metrics.total_filtered += 1;
837
838 debug!(
839 message_id = msg.message_id,
840 "message filtered/dropped by processor"
841 );
842
843 let stream_closed = state.stream_closed.unwrap_or(true);
844
845 if stream_closed {
848 if (state.filtered_count
850 + state.output_count
851 + state.output_error_count
852 + state.processed_error_count)
853 >= state.instance_count
854 {
855 remove_entry = true;
856 message_completed_successfully = true;
857 message_latency = Some(state.created_at.elapsed());
858 if let Some(chan) = state.closure.take() {
859 info!(
860 message_id = msg.message_id,
861 "message filtered - calling closure"
862 );
863 let _ = chan.send(Status::Processed);
864 }
865 }
866 }
867 }
868 MessageStatus::Shutdown => {
869 *closed_outputs += 1;
870 if closed_outputs == output_ct {
871 debug!("exiting message handler");
872 return Err(Error::EndOfInput);
873 }
874 }
875 MessageStatus::StreamComplete => {
876 state.stream_closed = Some(true);
877 state.output_count += 1;
878
879 stream_id = state.stream_id.clone();
880 if state.output_count >= state.instance_count {
881 remove_entry = true;
882 message_completed_successfully = true;
883 message_latency = Some(state.created_at.elapsed());
884 if let Some(chan) = state.closure.take() {
885 info!(message_id = msg.message_id, "calling closure");
886 let _ = chan.send(Status::Processed);
887 }
888 } else if (state.output_count
889 + state.output_error_count
890 + state.processed_error_count)
891 >= state.instance_count
892 {
893 remove_entry = true;
894 message_latency = Some(state.created_at.elapsed());
895 if let Some(chan) = state.closure.take() {
896 info!(message_id = msg.message_id, "calling closure");
897 let err = std::mem::take(&mut state.errors);
898 let _ = chan.send(Status::Errored(err));
899 }
900 };
901 }
902 };
903
904 trace!(
905 instance_count = state.instance_count,
906 processed_count = state.processed_count,
907 processed_error_count = state.processed_error_count,
908 output_count = state.output_count,
909 output_error_count = state.output_error_count,
910 stream_id = state.stream_id,
911 stream_closed = state.stream_closed,
912 state = msg.status.to_string(),
913 message_id = msg.message_id,
914 "Received message state"
915 );
916 }
917 };
918
919 if let Some(sid) = stream_id {
921 match handles.get(&sid) {
922 None => {
923 return Err(Error::ExecutionError(format!(
924 "StreamID {} does not exist (Message ID {})",
925 sid, msg.message_id
926 )))
927 }
928 Some(s) => {
929 pending_messages.push(InternalMessageState {
930 message_id: sid,
931 status: msg.status.clone(),
932 stream_id: s.stream_id.clone(),
933 is_stream: true,
934 bytes: msg.bytes,
935 });
936 }
937 }
938 }
939
940 if remove_entry {
941 trace!(
942 message_id = msg.message_id,
943 "Marking message for removal from state"
944 );
945 entries_to_remove.push(msg.message_id);
946 if !msg.is_stream {
948 if message_completed_successfully {
949 metrics.total_completed += 1;
950 }
951 if let Some(latency) = message_latency {
953 metrics.record_latency(latency);
954 }
955 }
956 }
957 }
958
959 for message_id in entries_to_remove {
961 let _ = handles.remove(&message_id);
962 }
963
964 Ok(())
965}
966
967async fn message_handler(
968 new_msg: Receiver<MessageHandle>,
969 msg_status: Receiver<InternalMessageState>,
970 output_ct: usize,
971 mut metrics_backend: Box<dyn Metrics>,
972 metrics_interval_secs: u64,
973 collect_system_metrics: bool,
974) -> Result<(), Error> {
975 let mut handles: FxHashMap<String, State> = FxHashMap::default();
977 handles.reserve(1024);
978 let mut closed_outputs = 0;
979 let stale_timeout = Duration::from_secs(STALE_MESSAGE_TIMEOUT_SECS);
980 let mut metrics = MessageMetrics::new();
981
982 let mut system = if collect_system_metrics {
984 let mut sys = System::new();
985 sys.refresh_cpu_usage();
987 sys.refresh_memory();
988 Some(sys)
989 } else {
990 None
991 };
992
993 let mut metrics_timer = interval(Duration::from_secs(metrics_interval_secs));
995 metrics_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
996 metrics_timer.tick().await;
998
999 let mut cleanup_timer = interval(Duration::from_secs(STALE_CLEANUP_INTERVAL_SECS));
1000 cleanup_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
1001 cleanup_timer.tick().await;
1003
1004 debug!(
1005 interval_secs = metrics_interval_secs,
1006 collect_system_metrics = collect_system_metrics,
1007 "Metrics recording interval configured"
1008 );
1009
1010 loop {
1011 tokio::select! {
1012 biased;
1014 Ok(msg) = new_msg.recv_async() => {
1015 trace!(message_id = msg.message_id, "Received new message");
1016 if msg.is_stream && msg.stream_complete {
1017 metrics.streams_completed += 1;
1018 if let Err(e) = process_state(&mut handles, &output_ct, &mut closed_outputs, InternalMessageState {
1019 message_id: msg.message_id.clone(),
1020 status: MessageStatus::StreamComplete,
1021 stream_id: msg.stream_id.clone(),
1022 is_stream: true,
1023 bytes: 0,
1024 }, &mut metrics) {
1025 match e {
1026 Error::EndOfInput => {
1027 log_shutdown_metrics(&metrics, handles.len(), metrics_backend.as_mut(), system.as_mut());
1028 return Ok(());
1029 }
1030 _ => return Err(e),
1031 }
1032 }
1033 continue
1034 };
1035
1036 metrics.total_received += 1;
1038 metrics.input_bytes += msg.input_bytes;
1039 if msg.is_stream {
1040 metrics.streams_started += 1;
1041 }
1042
1043 let closure = msg.closure;
1044 let stream_id = msg.stream_id;
1045 let is_stream = msg.is_stream;
1046 match handles.entry(msg.message_id) {
1048 Entry::Vacant(entry) => {
1049 entry.insert(State {
1050 instance_count: 1,
1051 processed_count: 0,
1052 processed_error_count: 0,
1053 output_count: 0,
1054 output_error_count: 0,
1055 filtered_count: 0,
1056 closure,
1057 errors: Vec::with_capacity(2), stream_id: stream_id.clone(),
1059 stream_closed: if is_stream { Some(false) } else { None },
1060 created_at: Instant::now(),
1061 });
1062 }
1063 Entry::Occupied(entry) => {
1064 metrics.duplicates_rejected += 1;
1065 error!(message_id = entry.key(), "Received duplicate message");
1066 return Err(Error::ExecutionError("Duplicate Message ID Error".into()));
1067 }
1068 }
1069
1070 if let Some(s) = &stream_id {
1071 if let Err(e) = process_state(&mut handles, &output_ct, &mut closed_outputs, InternalMessageState{
1072 message_id: s.clone(),
1073 status: MessageStatus::New,
1074 stream_id: None,
1075 is_stream: true,
1076 bytes: 0,
1077 }, &mut metrics) {
1078 match e {
1079 Error::EndOfInput => {
1080 log_shutdown_metrics(&metrics, handles.len(), metrics_backend.as_mut(), system.as_mut());
1081 return Ok(());
1082 }
1083 _ => return Err(e),
1084 }
1085 };
1086 }
1087 },
1088 Ok(msg) = msg_status.recv_async() => {
1089 if let Err(e) = process_state(&mut handles, &output_ct, &mut closed_outputs, msg, &mut metrics) {
1090 match e {
1091 Error::EndOfInput => {
1092 log_shutdown_metrics(&metrics, handles.len(), metrics_backend.as_mut(), system.as_mut());
1093 return Ok(());
1094 }
1095 _ => return Err(e),
1096 };
1097 };
1098 },
1099 _ = metrics_timer.tick() => {
1100 metrics.record(metrics_backend.as_mut(), handles.len(), system.as_mut());
1101 trace!(
1102 in_flight = handles.len(),
1103 throughput = format!("{:.2}", metrics.throughput_per_sec()),
1104 "Recorded periodic metrics"
1105 );
1106 },
1107 _ = cleanup_timer.tick() => {
1108 let before_count = handles.len();
1109 handles.retain(|message_id, state| {
1110 let is_stale = state.created_at.elapsed() >= stale_timeout;
1111 if is_stale {
1112 warn!(
1113 message_id = message_id,
1114 age_secs = state.created_at.elapsed().as_secs(),
1115 "Removing stale message state entry"
1116 );
1117 }
1118 !is_stale
1119 });
1120 let removed = before_count - handles.len();
1121 if removed > 0 {
1122 metrics.stale_entries_removed += removed as u64;
1123 info!(
1124 removed_count = removed,
1125 remaining_count = handles.len(),
1126 "Cleaned up stale message state entries"
1127 );
1128 }
1129 },
1130 else => break,
1131 }
1132 }
1133
1134 log_shutdown_metrics(
1135 &metrics,
1136 handles.len(),
1137 metrics_backend.as_mut(),
1138 system.as_mut(),
1139 );
1140 Ok(())
1141}
1142
1143fn log_shutdown_metrics(
1145 metrics: &MessageMetrics,
1146 in_flight: usize,
1147 metrics_backend: &mut dyn Metrics,
1148 system: Option<&mut System>,
1149) {
1150 metrics.record(metrics_backend, in_flight, system);
1152
1153 info!(
1154 total_received = metrics.total_received,
1155 total_completed = metrics.total_completed,
1156 total_process_errors = metrics.total_process_errors,
1157 total_output_errors = metrics.total_output_errors,
1158 streams_started = metrics.streams_started,
1159 streams_completed = metrics.streams_completed,
1160 duplicates_rejected = metrics.duplicates_rejected,
1161 stale_entries_removed = metrics.stale_entries_removed,
1162 duration_secs = metrics.elapsed().as_secs(),
1163 throughput_per_sec = format!("{:.2}", metrics.throughput_per_sec()),
1164 error_rate_percent = format!("{:.2}", metrics.error_rate()),
1165 remaining_in_flight = in_flight,
1166 "Message handler shutdown complete"
1167 );
1168}
1169
1170async fn input(
1171 input: ParsedRegisteredItem,
1172 output: Sender<InternalMessage>,
1173 state_handle: Sender<MessageHandle>,
1174 kill_switch: Receiver<()>,
1175) -> Result<(), Error> {
1176 trace!("started input");
1177
1178 let item = (input.creator)(input.config.clone()).await?;
1179
1180 match item {
1181 ExecutionType::Input(i) => {
1182 crate::modules::inputs::run_input(i, output, state_handle, kill_switch).await
1183 }
1184 ExecutionType::InputBatch(i) => {
1185 crate::modules::inputs::run_input_batch(i, output, state_handle, kill_switch).await
1186 }
1187 _ => {
1188 error!("invalid execution type for input");
1189 Err(Error::Validation("invalid execution type".into()))
1190 }
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use super::*;
1197
1198 #[test]
1199 fn test_message_metrics_new() {
1200 let metrics = MessageMetrics::new();
1201 assert_eq!(metrics.total_received, 0);
1202 assert_eq!(metrics.total_completed, 0);
1203 assert_eq!(metrics.total_process_errors, 0);
1204 assert_eq!(metrics.total_output_errors, 0);
1205 assert_eq!(metrics.streams_started, 0);
1206 assert_eq!(metrics.streams_completed, 0);
1207 assert_eq!(metrics.duplicates_rejected, 0);
1208 assert_eq!(metrics.stale_entries_removed, 0);
1209 assert!(metrics.started_at.is_some());
1210 }
1211
1212 #[test]
1213 fn test_message_metrics_default() {
1214 let metrics = MessageMetrics::default();
1215 assert_eq!(metrics.total_received, 0);
1216 assert!(metrics.started_at.is_none());
1217 }
1218
1219 #[test]
1220 fn test_message_metrics_elapsed() {
1221 let metrics = MessageMetrics::new();
1222 std::thread::sleep(std::time::Duration::from_millis(10));
1223 let elapsed = metrics.elapsed();
1224 assert!(elapsed.as_millis() >= 10);
1225 }
1226
1227 #[test]
1228 fn test_message_metrics_elapsed_without_start() {
1229 let metrics = MessageMetrics::default();
1230 let elapsed = metrics.elapsed();
1231 assert_eq!(elapsed, std::time::Duration::default());
1232 }
1233
1234 #[test]
1235 fn test_message_metrics_throughput_per_sec() {
1236 let mut metrics = MessageMetrics::new();
1237 metrics.total_completed = 100;
1238 let throughput = metrics.throughput_per_sec();
1240 assert!(throughput >= 0.0);
1241 }
1242
1243 #[test]
1244 fn test_message_metrics_throughput_zero_elapsed() {
1245 let mut metrics = MessageMetrics::default();
1246 metrics.total_completed = 100;
1247 let throughput = metrics.throughput_per_sec();
1249 assert_eq!(throughput, 0.0);
1250 }
1251
1252 #[test]
1253 fn test_message_metrics_error_rate_no_messages() {
1254 let metrics = MessageMetrics::new();
1255 let error_rate = metrics.error_rate();
1256 assert_eq!(error_rate, 0.0);
1257 }
1258
1259 #[test]
1260 fn test_message_metrics_error_rate_with_errors() {
1261 let mut metrics = MessageMetrics::new();
1262 metrics.total_completed = 90;
1263 metrics.total_process_errors = 5;
1264 metrics.total_output_errors = 5;
1265 let error_rate = metrics.error_rate();
1266 assert!((error_rate - 10.0).abs() < 0.01);
1268 }
1269
1270 #[test]
1271 fn test_message_metrics_error_rate_all_errors() {
1272 let mut metrics = MessageMetrics::new();
1273 metrics.total_completed = 0;
1274 metrics.total_process_errors = 50;
1275 metrics.total_output_errors = 50;
1276 let error_rate = metrics.error_rate();
1277 assert!((error_rate - 100.0).abs() < 0.01);
1279 }
1280
1281 #[test]
1282 fn test_message_metrics_record_with_noop_backend() {
1283 use crate::modules::metrics::NoOpMetrics;
1284 let metrics = MessageMetrics::new();
1286 let mut backend = NoOpMetrics::new();
1287 metrics.record(&mut backend, 10, None);
1288 }
1290
1291 #[test]
1292 fn test_process_state_unknown_message_id() {
1293 let mut handles = FxHashMap::default();
1294 let mut closed_outputs = 0;
1295 let mut metrics = MessageMetrics::new();
1296
1297 let result = process_state(
1299 &mut handles,
1300 &1,
1301 &mut closed_outputs,
1302 InternalMessageState {
1303 message_id: "unknown_id".to_string(),
1304 status: MessageStatus::Processed,
1305 stream_id: None,
1306 is_stream: false,
1307 bytes: 0,
1308 },
1309 &mut metrics,
1310 );
1311
1312 assert!(result.is_err());
1314 match result {
1315 Err(Error::ExecutionError(msg)) => {
1316 assert!(msg.contains("does not exist"));
1317 }
1318 _ => panic!("Expected ExecutionError"),
1319 }
1320 }
1321
1322 #[test]
1323 fn test_process_state_shutdown_signal() {
1324 let mut handles = FxHashMap::default();
1325 let mut closed_outputs = 0;
1326 let mut metrics = MessageMetrics::new();
1327 let output_ct = 1; let result = process_state(
1331 &mut handles,
1332 &output_ct,
1333 &mut closed_outputs,
1334 InternalMessageState {
1335 message_id: crate::SHUTDOWN_MESSAGE_ID.to_string(),
1336 status: MessageStatus::Shutdown,
1337 stream_id: None,
1338 is_stream: false,
1339 bytes: 0,
1340 },
1341 &mut metrics,
1342 );
1343
1344 assert!(result.is_err());
1346 assert!(matches!(result, Err(Error::EndOfInput)));
1347 assert_eq!(closed_outputs, 1);
1348 }
1349
1350 #[test]
1351 fn test_process_state_tracks_errors() {
1352 let mut handles = FxHashMap::default();
1353 let mut closed_outputs = 0;
1354 let mut metrics = MessageMetrics::new();
1355 let message_id = "test_msg".to_string();
1356
1357 handles.insert(
1359 message_id.clone(),
1360 State {
1361 instance_count: 1,
1362 processed_count: 0,
1363 processed_error_count: 0,
1364 output_count: 0,
1365 output_error_count: 0,
1366 filtered_count: 0,
1367 closure: None,
1368 errors: Vec::new(),
1369 stream_id: None,
1370 stream_closed: Some(true),
1371 created_at: std::time::Instant::now(),
1372 },
1373 );
1374
1375 let result = process_state(
1377 &mut handles,
1378 &1,
1379 &mut closed_outputs,
1380 InternalMessageState {
1381 message_id: message_id.clone(),
1382 status: MessageStatus::ProcessError("test error".to_string()),
1383 stream_id: None,
1384 is_stream: false,
1385 bytes: 0,
1386 },
1387 &mut metrics,
1388 );
1389
1390 assert!(result.is_ok());
1391 assert_eq!(metrics.total_process_errors, 1);
1392 assert!(!handles.contains_key(&message_id));
1394 }
1395
1396 #[test]
1397 fn test_message_status_display() {
1398 assert_eq!(format!("{}", MessageStatus::New), "New");
1399 assert_eq!(format!("{}", MessageStatus::Processed), "Processed");
1400 assert_eq!(
1401 format!("{}", MessageStatus::ProcessError("err".into())),
1402 "ProcessError"
1403 );
1404 assert_eq!(format!("{}", MessageStatus::Output), "Output");
1405 assert_eq!(
1406 format!("{}", MessageStatus::OutputError("err".into())),
1407 "OutputError"
1408 );
1409 assert_eq!(format!("{}", MessageStatus::Shutdown), "Shutdown");
1410 assert_eq!(
1411 format!("{}", MessageStatus::StreamComplete),
1412 "StreamComplete"
1413 );
1414 }
1415}