1pub use common::RcpdType;
3
4lazy_static::lazy_static! {
5 static ref PROGRESS_MAP: std::sync::Mutex<enum_map::EnumMap<RcpdType, common::SerializableProgress>> =
7 std::sync::Mutex::new(enum_map::EnumMap::default());
8}
9
10pub fn get_latest_progress_snapshot() -> enum_map::EnumMap<RcpdType, common::SerializableProgress> {
12 PROGRESS_MAP.lock().unwrap().clone()
13}
14
15pub async fn run_sender(
16 mut receiver: tokio::sync::mpsc::UnboundedReceiver<common::remote_tracing::TracingMessage>,
17 mut send_stream: crate::streams::SendStream,
18 cancellation_token: tokio_util::sync::CancellationToken,
19) -> anyhow::Result<()> {
20 while let Some(msg) = tokio::select! {
21 msg = receiver.recv() => msg,
22 _ = cancellation_token.cancelled() => {
23 println!("Remote tracing sender done");
24 return Ok(());
25 }
26 } {
27 if let Err(e) = send_stream.send_batch_message(&msg).await {
28 eprintln!("Failed to send tracing message: {e}");
29 }
30 }
31 println!("Remote tracing sender done, no more messages to send");
32 Ok(())
33}
34
35pub async fn run_receiver(
36 mut recv_stream: crate::streams::RecvStream,
37 rcpd_type: RcpdType,
38) -> anyhow::Result<()> {
39 while let Some(tracing_message) = recv_stream
40 .recv_object::<common::remote_tracing::TracingMessage>()
41 .await?
42 {
43 match tracing_message {
44 common::remote_tracing::TracingMessage::Log {
45 timestamp,
46 level,
47 target,
48 message,
49 } => {
50 let log_level = match level.as_str() {
51 "ERROR" => tracing::Level::ERROR,
52 "WARN" => tracing::Level::WARN,
53 "INFO" => tracing::Level::INFO,
54 "DEBUG" => tracing::Level::DEBUG,
55 "TRACE" => tracing::Level::TRACE,
56 _ => tracing::Level::INFO,
57 };
58 let remote_target = format!("remote::{rcpd_type}::{target}");
59 let timestamp_str = match timestamp.duration_since(std::time::UNIX_EPOCH) {
60 Ok(duration) => {
61 let datetime = chrono::DateTime::<chrono::Utc>::from_timestamp(
62 duration.as_secs() as i64,
63 duration.subsec_nanos(),
64 );
65 match datetime {
66 Some(dt) => dt.format("%Y-%m-%d %H:%M:%S%.3f UTC").to_string(),
67 None => format!("{timestamp:?}"),
68 }
69 }
70 Err(_) => format!("{timestamp:?}"),
71 };
72 match log_level {
73 tracing::Level::ERROR => {
74 tracing::error!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
75 }
76 tracing::Level::WARN => {
77 tracing::warn!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
78 }
79 tracing::Level::INFO => {
80 tracing::info!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
81 }
82 tracing::Level::DEBUG => {
83 tracing::debug!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
84 }
85 tracing::Level::TRACE => {
86 tracing::trace!(target: "remote", "[{}] {}: {}", timestamp_str, remote_target, message)
87 }
88 }
89 }
90 common::remote_tracing::TracingMessage::Progress(progress) => {
91 tracing::debug!(target: "remote", "Received progress update from {} rcpd: {:?}", rcpd_type, progress);
92 PROGRESS_MAP.lock().unwrap()[rcpd_type] = progress;
93 }
94 }
95 }
96 Ok(())
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn test_rcpd_type_display() {
105 assert_eq!(format!("{}", RcpdType::Source), "source");
106 assert_eq!(format!("{}", RcpdType::Destination), "destination");
107 }
108
109 #[test]
110 fn test_rcpd_type_debug() {
111 assert_eq!(format!("{:?}", RcpdType::Source), "Source");
112 assert_eq!(format!("{:?}", RcpdType::Destination), "Destination");
113 }
114}