remote/tracelog/
mod.rs

1// Re-export RcpdType from common to avoid duplication
2use chrono::TimeZone;
3pub use common::RcpdType;
4
5lazy_static::lazy_static! {
6    // static storage for the latest progress from each rcpd process
7    static ref PROGRESS_MAP: std::sync::Mutex<enum_map::EnumMap<RcpdType, common::SerializableProgress>> =
8        std::sync::Mutex::new(enum_map::EnumMap::default());
9}
10
11/// Get the latest progress snapshot from all rcpd processes
12pub fn get_latest_progress_snapshot() -> enum_map::EnumMap<RcpdType, common::SerializableProgress> {
13    PROGRESS_MAP.lock().unwrap().clone()
14}
15
16pub async fn run_sender(
17    mut receiver: tokio::sync::mpsc::UnboundedReceiver<common::remote_tracing::TracingMessage>,
18    mut send_stream: crate::streams::SendStream,
19    cancellation_token: tokio_util::sync::CancellationToken,
20) -> anyhow::Result<()> {
21    while let Some(msg) = tokio::select! {
22        msg = receiver.recv() => msg,
23        _ = cancellation_token.cancelled() => {
24            println!("Remote tracing sender done");
25            return Ok(());
26        }
27    } {
28        if let Err(e) = send_stream.send_batch_message(&msg).await {
29            eprintln!("Failed to send tracing message: {e}");
30        }
31    }
32    println!("Remote tracing sender done, no more messages to send");
33    Ok(())
34}
35
36pub async fn run_receiver(
37    mut recv_stream: crate::streams::RecvStream,
38    rcpd_type: RcpdType,
39) -> anyhow::Result<()> {
40    while let Some(tracing_message) = recv_stream
41        .recv_object::<common::remote_tracing::TracingMessage>()
42        .await?
43    {
44        match tracing_message {
45            common::remote_tracing::TracingMessage::Log {
46                timestamp,
47                level,
48                target,
49                message,
50            } => {
51                let log_level = match level.as_str() {
52                    "ERROR" => tracing::Level::ERROR,
53                    "WARN" => tracing::Level::WARN,
54                    "INFO" => tracing::Level::INFO,
55                    "DEBUG" => tracing::Level::DEBUG,
56                    "TRACE" => tracing::Level::TRACE,
57                    _ => tracing::Level::INFO,
58                };
59                let remote_target = format!("remote::{rcpd_type}::{target}");
60                let timestamp_str = match timestamp.duration_since(std::time::UNIX_EPOCH) {
61                    Ok(duration) => {
62                        let datetime = chrono::Local
63                            .timestamp_opt(duration.as_secs() as i64, duration.subsec_nanos());
64                        match datetime.single() {
65                            Some(dt) => dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
66                            None => format!("{timestamp:?}"),
67                        }
68                    }
69                    Err(_) => format!("{timestamp:?}"),
70                };
71                match log_level {
72                    tracing::Level::ERROR => {
73                        tracing::error!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
74                    }
75                    tracing::Level::WARN => {
76                        tracing::warn!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
77                    }
78                    tracing::Level::INFO => {
79                        tracing::info!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
80                    }
81                    tracing::Level::DEBUG => {
82                        tracing::debug!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
83                    }
84                    tracing::Level::TRACE => {
85                        tracing::trace!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
86                    }
87                }
88            }
89            common::remote_tracing::TracingMessage::Progress(progress) => {
90                tracing::debug!(target: "remote", "Received progress update from {} rcpd: {:?}", rcpd_type, progress);
91                PROGRESS_MAP.lock().unwrap()[rcpd_type] = progress;
92            }
93        }
94    }
95    Ok(())
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn test_rcpd_type_display() {
104        assert_eq!(format!("{}", RcpdType::Source), "source");
105        assert_eq!(format!("{}", RcpdType::Destination), "destination");
106    }
107
108    #[test]
109    fn test_rcpd_type_debug() {
110        assert_eq!(format!("{:?}", RcpdType::Source), "Source");
111        assert_eq!(format!("{:?}", RcpdType::Destination), "Destination");
112    }
113}