1use serde::{Deserialize, Serialize};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct EchoConfig {
23 pub topics: Vec<String>,
25 pub decimation: u32,
27 pub max_messages: u32,
29}
30
31impl Default for EchoConfig {
32 fn default() -> Self {
33 Self {
34 topics: Vec::new(),
35 decimation: 1,
36 max_messages: 10,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct CapturedMessage {
44 pub topic: String,
46 pub sequence: u64,
48 pub timestamp: String,
50 #[serde(with = "base64_bytes")]
52 pub data: Vec<u8>,
53 pub size_bytes: usize,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct EchoReport {
60 pub config: EchoConfig,
62 pub messages: Vec<CapturedMessage>,
64 pub topic_stats: Vec<TopicStat>,
66 pub total_received: u64,
68 pub total_captured: u64,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TopicStat {
75 pub topic: String,
77 pub received: u64,
79 pub captured: u64,
81 pub bytes_captured: u64,
83}
84
85mod base64_bytes {
87 use serde::{Deserialize, Deserializer, Serializer};
88
89 pub fn serialize<S>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error>
90 where
91 S: Serializer,
92 {
93 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
95 serializer.serialize_str(&hex)
96 }
97
98 pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
99 where
100 D: Deserializer<'de>,
101 {
102 let s = String::deserialize(deserializer)?;
103 (0..s.len())
104 .step_by(2)
105 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).map_err(serde::de::Error::custom))
106 .collect()
107 }
108}
109
110pub fn decimate(
115 topic: &str,
116 raw_messages: &[Vec<u8>],
117 decimation: u32,
118 max_messages: u32,
119 timestamp: &str,
120) -> (Vec<CapturedMessage>, TopicStat) {
121 let decimation = decimation.max(1);
122 let mut captured = Vec::new();
123 let mut seq: u64 = 0;
124
125 for (i, data) in raw_messages.iter().enumerate() {
126 if (i as u32) % decimation != 0 {
127 continue;
128 }
129 if max_messages > 0 && captured.len() >= max_messages as usize {
130 break;
131 }
132 seq += 1;
133 captured.push(CapturedMessage {
134 topic: topic.to_string(),
135 sequence: seq,
136 timestamp: timestamp.to_string(),
137 size_bytes: data.len(),
138 data: data.clone(),
139 });
140 }
141
142 let bytes_captured: u64 = captured.iter().map(|m| m.size_bytes as u64).sum();
143 let stat = TopicStat {
144 topic: topic.to_string(),
145 received: raw_messages.len() as u64,
146 captured: captured.len() as u64,
147 bytes_captured,
148 };
149
150 (captured, stat)
151}
152
153pub fn build_report(
155 config: &EchoConfig,
156 topic_results: Vec<(Vec<CapturedMessage>, TopicStat)>,
157) -> EchoReport {
158 let mut all_messages = Vec::new();
159 let mut topic_stats = Vec::new();
160 let mut total_received: u64 = 0;
161 let mut total_captured: u64 = 0;
162
163 for (messages, stat) in topic_results {
164 total_received += stat.received;
165 total_captured += stat.captured;
166 topic_stats.push(stat);
167 all_messages.extend(messages);
168 }
169
170 EchoReport {
171 config: config.clone(),
172 messages: all_messages,
173 topic_stats,
174 total_received,
175 total_captured,
176 }
177}
178
179pub fn format_report(report: &EchoReport) -> String {
181 let mut out = String::new();
182 out.push_str("Topic Echo Report\n");
183 out.push_str(&"=".repeat(50));
184 out.push('\n');
185
186 out.push_str(&format!(
187 "\nDecimation: 1/{}, Max per topic: {}\n",
188 report.config.decimation,
189 if report.config.max_messages == 0 {
190 "unlimited".to_string()
191 } else {
192 report.config.max_messages.to_string()
193 }
194 ));
195
196 out.push_str(&format!(
197 "Total received: {}, captured: {}\n",
198 report.total_received, report.total_captured
199 ));
200
201 out.push_str("\n## Per-topic stats\n");
202 for stat in &report.topic_stats {
203 out.push_str(&format!(
204 " {}: {}/{} messages, {} bytes\n",
205 stat.topic, stat.captured, stat.received, stat.bytes_captured
206 ));
207 }
208
209 out.push_str("\n## Captured messages\n");
210 for msg in &report.messages {
211 out.push_str(&format!(
212 " [{}] #{} on {} ({} bytes)\n",
213 msg.timestamp, msg.sequence, msg.topic, msg.size_bytes
214 ));
215 }
216
217 out
218}
219
220pub fn parse_args(args: &[String]) -> EchoConfig {
224 let mut config = EchoConfig::default();
225 let mut i = 0;
226
227 while i < args.len() {
228 match args[i].as_str() {
229 "--decimation" | "-d" => {
230 if i + 1 < args.len() {
231 config.decimation = args[i + 1].parse().unwrap_or(1);
232 i += 2;
233 } else {
234 i += 1;
235 }
236 }
237 "--max" | "-n" => {
238 if i + 1 < args.len() {
239 config.max_messages = args[i + 1].parse().unwrap_or(10);
240 i += 2;
241 } else {
242 i += 1;
243 }
244 }
245 topic => {
246 config.topics.push(topic.to_string());
247 i += 1;
248 }
249 }
250 }
251
252 config
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn decimate_no_reduction() {
261 let messages: Vec<Vec<u8>> = vec![vec![1, 2], vec![3, 4], vec![5, 6]];
262 let (captured, stat) = decimate("/topic", &messages, 1, 0, "2026-04-23T12:00:00Z");
263 assert_eq!(captured.len(), 3);
264 assert_eq!(stat.received, 3);
265 assert_eq!(stat.captured, 3);
266 }
267
268 #[test]
269 fn decimate_every_second() {
270 let messages: Vec<Vec<u8>> = (0..10).map(|i| vec![i]).collect();
271 let (captured, stat) = decimate("/cmd_vel", &messages, 2, 0, "2026-04-23T12:00:00Z");
272 assert_eq!(captured.len(), 5); assert_eq!(stat.received, 10);
274 assert_eq!(stat.captured, 5);
275 assert_eq!(captured[0].data, vec![0]);
276 assert_eq!(captured[1].data, vec![2]);
277 }
278
279 #[test]
280 fn decimate_with_max() {
281 let messages: Vec<Vec<u8>> = (0..20).map(|i| vec![i]).collect();
282 let (captured, stat) = decimate("/odom", &messages, 1, 5, "2026-04-23T12:00:00Z");
283 assert_eq!(captured.len(), 5);
284 assert_eq!(stat.received, 20);
285 assert_eq!(stat.captured, 5);
286 }
287
288 #[test]
289 fn decimate_empty_stream() {
290 let messages: Vec<Vec<u8>> = Vec::new();
291 let (captured, stat) = decimate("/empty", &messages, 1, 10, "2026-04-23T12:00:00Z");
292 assert!(captured.is_empty());
293 assert_eq!(stat.received, 0);
294 assert_eq!(stat.captured, 0);
295 }
296
297 #[test]
298 fn sequence_numbers_ascending() {
299 let messages: Vec<Vec<u8>> = vec![vec![1], vec![2], vec![3]];
300 let (captured, _) = decimate("/seq", &messages, 1, 0, "2026-04-23T12:00:00Z");
301 let seqs: Vec<u64> = captured.iter().map(|m| m.sequence).collect();
302 assert_eq!(seqs, vec![1, 2, 3]);
303 }
304
305 #[test]
306 fn build_report_aggregates() {
307 let config = EchoConfig {
308 topics: vec!["/a".into(), "/b".into()],
309 decimation: 1,
310 max_messages: 10,
311 };
312 let (msgs_a, stat_a) = decimate("/a", &[vec![1], vec![2]], 1, 10, "2026-04-23T12:00:00Z");
313 let (msgs_b, stat_b) = decimate(
314 "/b",
315 &[vec![3], vec![4], vec![5]],
316 1,
317 10,
318 "2026-04-23T12:00:00Z",
319 );
320
321 let report = build_report(&config, vec![(msgs_a, stat_a), (msgs_b, stat_b)]);
322 assert_eq!(report.total_received, 5);
323 assert_eq!(report.total_captured, 5);
324 assert_eq!(report.messages.len(), 5);
325 assert_eq!(report.topic_stats.len(), 2);
326 }
327
328 #[test]
329 fn parse_args_basic() {
330 let args: Vec<String> = vec!["/cmd_vel", "/odom", "--decimation", "5", "--max", "20"]
331 .into_iter()
332 .map(String::from)
333 .collect();
334 let config = parse_args(&args);
335 assert_eq!(config.topics, vec!["/cmd_vel", "/odom"]);
336 assert_eq!(config.decimation, 5);
337 assert_eq!(config.max_messages, 20);
338 }
339
340 #[test]
341 fn parse_args_defaults() {
342 let args: Vec<String> = vec!["/scan"].into_iter().map(String::from).collect();
343 let config = parse_args(&args);
344 assert_eq!(config.topics, vec!["/scan"]);
345 assert_eq!(config.decimation, 1);
346 assert_eq!(config.max_messages, 10);
347 }
348
349 #[test]
350 fn serialization_roundtrip() {
351 let msg = CapturedMessage {
352 topic: "/test".into(),
353 sequence: 1,
354 timestamp: "2026-04-23T12:00:00Z".into(),
355 data: vec![0xde, 0xad, 0xbe, 0xef],
356 size_bytes: 4,
357 };
358 let json = serde_json::to_string(&msg).unwrap();
359 let loaded: CapturedMessage = serde_json::from_str(&json).unwrap();
360 assert_eq!(msg, loaded);
361 }
362
363 #[test]
364 fn format_report_contains_sections() {
365 let config = EchoConfig {
366 topics: vec!["/scan".into()],
367 decimation: 2,
368 max_messages: 5,
369 };
370 let (msgs, stat) = decimate(
371 "/scan",
372 &[vec![1], vec![2], vec![3], vec![4]],
373 2,
374 5,
375 "2026-04-23T12:00:00Z",
376 );
377 let report = build_report(&config, vec![(msgs, stat)]);
378 let text = format_report(&report);
379 assert!(text.contains("Topic Echo Report"));
380 assert!(text.contains("1/2")); assert!(text.contains("/scan"));
382 assert!(text.contains("Per-topic stats"));
383 assert!(text.contains("Captured messages"));
384 }
385
386 #[test]
387 fn bytes_captured_tracking() {
388 let messages: Vec<Vec<u8>> = vec![vec![1, 2, 3], vec![4, 5]];
389 let (_, stat) = decimate("/sized", &messages, 1, 0, "2026-04-23T12:00:00Z");
390 assert_eq!(stat.bytes_captured, 5); }
392}