remote/tracelog/
mod.rs

1// Re-export RcpdType from common to avoid duplication
2pub use common::RcpdType;
3
4lazy_static::lazy_static! {
5    // static storage for the latest progress from each rcpd process
6    static ref PROGRESS_MAP: std::sync::Mutex<enum_map::EnumMap<RcpdType, common::SerializableProgress>> =
7        std::sync::Mutex::new(enum_map::EnumMap::default());
8}
9
10/// Get the latest progress snapshot from all rcpd processes
11pub fn get_latest_progress_snapshot() -> enum_map::EnumMap<RcpdType, common::SerializableProgress> {
12    PROGRESS_MAP.lock().unwrap().clone()
13}
14
15pub async fn run_sender(
16    mut receiver: tokio::sync::mpsc::UnboundedReceiver<common::remote_tracing::TracingMessage>,
17    mut send_stream: crate::streams::SendStream,
18    cancellation_token: tokio_util::sync::CancellationToken,
19) -> anyhow::Result<()> {
20    while let Some(msg) = tokio::select! {
21        msg = receiver.recv() => msg,
22        _ = cancellation_token.cancelled() => {
23            println!("Remote tracing sender done");
24            return Ok(());
25        }
26    } {
27        if let Err(e) = send_stream.send_batch_message(&msg).await {
28            eprintln!("Failed to send tracing message: {e}");
29        }
30    }
31    println!("Remote tracing sender done, no more messages to send");
32    Ok(())
33}
34
35pub async fn run_receiver(
36    mut recv_stream: crate::streams::RecvStream,
37    rcpd_type: RcpdType,
38) -> anyhow::Result<()> {
39    while let Some(tracing_message) = recv_stream
40        .recv_object::<common::remote_tracing::TracingMessage>()
41        .await?
42    {
43        match tracing_message {
44            common::remote_tracing::TracingMessage::Log {
45                timestamp,
46                level,
47                target,
48                message,
49            } => {
50                let log_level = match level.as_str() {
51                    "ERROR" => tracing::Level::ERROR,
52                    "WARN" => tracing::Level::WARN,
53                    "INFO" => tracing::Level::INFO,
54                    "DEBUG" => tracing::Level::DEBUG,
55                    "TRACE" => tracing::Level::TRACE,
56                    _ => tracing::Level::INFO,
57                };
58                let remote_target = format!("remote::{rcpd_type}::{target}");
59                let timestamp_str = match timestamp.duration_since(std::time::UNIX_EPOCH) {
60                    Ok(duration) => {
61                        let datetime = chrono::DateTime::<chrono::Utc>::from_timestamp(
62                            duration.as_secs() as i64,
63                            duration.subsec_nanos(),
64                        );
65                        match datetime {
66                            Some(dt) => dt.format("%Y-%m-%d %H:%M:%S%.3f UTC").to_string(),
67                            None => format!("{timestamp:?}"),
68                        }
69                    }
70                    Err(_) => format!("{timestamp:?}"),
71                };
72                match log_level {
73                    tracing::Level::ERROR => {
74                        tracing::error!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
75                    }
76                    tracing::Level::WARN => {
77                        tracing::warn!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
78                    }
79                    tracing::Level::INFO => {
80                        tracing::info!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
81                    }
82                    tracing::Level::DEBUG => {
83                        tracing::debug!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
84                    }
85                    tracing::Level::TRACE => {
86                        tracing::trace!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
87                    }
88                }
89            }
90            common::remote_tracing::TracingMessage::Progress(progress) => {
91                tracing::debug!(target: "remote", "Received progress update from {} rcpd: {:?}", rcpd_type, progress);
92                PROGRESS_MAP.lock().unwrap()[rcpd_type] = progress;
93            }
94        }
95    }
96    Ok(())
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_rcpd_type_display() {
105        assert_eq!(format!("{}", RcpdType::Source), "source");
106        assert_eq!(format!("{}", RcpdType::Destination), "destination");
107    }
108
109    #[test]
110    fn test_rcpd_type_debug() {
111        assert_eq!(format!("{:?}", RcpdType::Source), "Source");
112        assert_eq!(format!("{:?}", RcpdType::Destination), "Destination");
113    }
114}