1use chrono::TimeZone;
3pub use common::RcpdType;
4
5lazy_static::lazy_static! {
6 static ref PROGRESS_MAP: std::sync::Mutex<enum_map::EnumMap<RcpdType, common::SerializableProgress>> =
8 std::sync::Mutex::new(enum_map::EnumMap::default());
9}
10
11pub fn get_latest_progress_snapshot() -> enum_map::EnumMap<RcpdType, common::SerializableProgress> {
13 PROGRESS_MAP.lock().unwrap().clone()
14}
15
16pub async fn run_sender(
17 mut receiver: tokio::sync::mpsc::UnboundedReceiver<common::remote_tracing::TracingMessage>,
18 mut send_stream: crate::streams::SendStream,
19 cancellation_token: tokio_util::sync::CancellationToken,
20) -> anyhow::Result<()> {
21 while let Some(msg) = tokio::select! {
22 msg = receiver.recv() => msg,
23 _ = cancellation_token.cancelled() => {
24 println!("Remote tracing sender done");
25 return Ok(());
26 }
27 } {
28 if let Err(e) = send_stream.send_batch_message(&msg).await {
29 eprintln!("Failed to send tracing message: {e}");
30 }
31 }
32 println!("Remote tracing sender done, no more messages to send");
33 Ok(())
34}
35
36pub async fn run_receiver(
37 mut recv_stream: crate::streams::RecvStream,
38 rcpd_type: RcpdType,
39) -> anyhow::Result<()> {
40 while let Some(tracing_message) = recv_stream
41 .recv_object::<common::remote_tracing::TracingMessage>()
42 .await?
43 {
44 match tracing_message {
45 common::remote_tracing::TracingMessage::Log {
46 timestamp,
47 level,
48 target,
49 message,
50 } => {
51 let log_level = match level.as_str() {
52 "ERROR" => tracing::Level::ERROR,
53 "WARN" => tracing::Level::WARN,
54 "INFO" => tracing::Level::INFO,
55 "DEBUG" => tracing::Level::DEBUG,
56 "TRACE" => tracing::Level::TRACE,
57 _ => tracing::Level::INFO,
58 };
59 let remote_target = format!("remote::{rcpd_type}::{target}");
60 let timestamp_str = match timestamp.duration_since(std::time::UNIX_EPOCH) {
61 Ok(duration) => {
62 let datetime = chrono::Local
63 .timestamp_opt(duration.as_secs() as i64, duration.subsec_nanos());
64 match datetime.single() {
65 Some(dt) => dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
66 None => format!("{timestamp:?}"),
67 }
68 }
69 Err(_) => format!("{timestamp:?}"),
70 };
71 match log_level {
72 tracing::Level::ERROR => {
73 tracing::error!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
74 }
75 tracing::Level::WARN => {
76 tracing::warn!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
77 }
78 tracing::Level::INFO => {
79 tracing::info!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
80 }
81 tracing::Level::DEBUG => {
82 tracing::debug!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
83 }
84 tracing::Level::TRACE => {
85 tracing::trace!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
86 }
87 }
88 }
89 common::remote_tracing::TracingMessage::Progress(progress) => {
90 tracing::debug!(target: "remote", "Received progress update from {} rcpd: {:?}", rcpd_type, progress);
91 PROGRESS_MAP.lock().unwrap()[rcpd_type] = progress;
92 }
93 }
94 }
95 Ok(())
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn test_rcpd_type_display() {
104 assert_eq!(format!("{}", RcpdType::Source), "source");
105 assert_eq!(format!("{}", RcpdType::Destination), "destination");
106 }
107
108 #[test]
109 fn test_rcpd_type_debug() {
110 assert_eq!(format!("{:?}", RcpdType::Source), "Source");
111 assert_eq!(format!("{:?}", RcpdType::Destination), "Destination");
112 }
113}