Skip to main content

gang_capability_topic_echo/
lib.rs

1//! ROS 2 topic echo capability for Ganglion.
2//!
3//! Subscribes to specified ROS 2 topics and captures serialized messages
4//! with optional decimation (sampling every Nth message). Designed for
5//! remote inspection of topic traffic without requiring a local ROS
6//! installation on the operator side.
7//!
8//! When compiled to a WASM component this uses the `ros-interface` host
9//! import (`topic-subscribe`) to receive serialized messages. As a
10//! native library the capture and decimation logic is testable without
11//! ROS.
12//!
13//! The design spec designates this as the C++ reference capability
14//! (wasi-sdk + wit-bindgen). The Rust crate implements the canonical
15//! logic; a C++ example project would demonstrate native-language parity
16//! for the ROS 2 community.
17
18use serde::{Deserialize, Serialize};
19
20/// Configuration for a topic echo capture session.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct EchoConfig {
23    /// Topic names to subscribe to.
24    pub topics: Vec<String>,
25    /// Decimation factor: capture every Nth message. 1 = all messages.
26    pub decimation: u32,
27    /// Maximum number of messages to capture per topic (0 = unlimited).
28    pub max_messages: u32,
29}
30
31impl Default for EchoConfig {
32    fn default() -> Self {
33        Self {
34            topics: Vec::new(),
35            decimation: 1,
36            max_messages: 10,
37        }
38    }
39}
40
41/// A single captured topic message.
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct CapturedMessage {
44    /// Topic this message was received on.
45    pub topic: String,
46    /// Sequence number within this capture session.
47    pub sequence: u64,
48    /// ISO 8601 timestamp of capture.
49    pub timestamp: String,
50    /// Serialized message data (typically CDR-encoded by ROS 2).
51    #[serde(with = "base64_bytes")]
52    pub data: Vec<u8>,
53    /// Size of the serialized message in bytes.
54    pub size_bytes: usize,
55}
56
57/// Result of a topic echo session.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct EchoReport {
60    /// Configuration used for this session.
61    pub config: EchoConfig,
62    /// Captured messages across all topics.
63    pub messages: Vec<CapturedMessage>,
64    /// Per-topic statistics.
65    pub topic_stats: Vec<TopicStat>,
66    /// Total messages received (before decimation).
67    pub total_received: u64,
68    /// Total messages captured (after decimation).
69    pub total_captured: u64,
70}
71
72/// Per-topic statistics.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TopicStat {
75    /// Topic name.
76    pub topic: String,
77    /// Messages received on this topic.
78    pub received: u64,
79    /// Messages captured (after decimation).
80    pub captured: u64,
81    /// Total bytes of captured message data.
82    pub bytes_captured: u64,
83}
84
85/// Serde module for base64-encoded byte arrays in JSON.
86mod base64_bytes {
87    use serde::{Deserialize, Deserializer, Serializer};
88
89    pub fn serialize<S>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error>
90    where
91        S: Serializer,
92    {
93        // Simple hex encoding for portability (no base64 dep needed)
94        let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
95        serializer.serialize_str(&hex)
96    }
97
98    pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
99    where
100        D: Deserializer<'de>,
101    {
102        let s = String::deserialize(deserializer)?;
103        (0..s.len())
104            .step_by(2)
105            .map(|i| u8::from_str_radix(&s[i..i + 2], 16).map_err(serde::de::Error::custom))
106            .collect()
107    }
108}
109
110/// Apply decimation to a stream of raw messages for a single topic.
111///
112/// Returns only every Nth message (where N = `decimation`), up to
113/// `max_messages` total. Each captured message gets a sequential number.
114pub fn decimate(
115    topic: &str,
116    raw_messages: &[Vec<u8>],
117    decimation: u32,
118    max_messages: u32,
119    timestamp: &str,
120) -> (Vec<CapturedMessage>, TopicStat) {
121    let decimation = decimation.max(1);
122    let mut captured = Vec::new();
123    let mut seq: u64 = 0;
124
125    for (i, data) in raw_messages.iter().enumerate() {
126        if (i as u32) % decimation != 0 {
127            continue;
128        }
129        if max_messages > 0 && captured.len() >= max_messages as usize {
130            break;
131        }
132        seq += 1;
133        captured.push(CapturedMessage {
134            topic: topic.to_string(),
135            sequence: seq,
136            timestamp: timestamp.to_string(),
137            size_bytes: data.len(),
138            data: data.clone(),
139        });
140    }
141
142    let bytes_captured: u64 = captured.iter().map(|m| m.size_bytes as u64).sum();
143    let stat = TopicStat {
144        topic: topic.to_string(),
145        received: raw_messages.len() as u64,
146        captured: captured.len() as u64,
147        bytes_captured,
148    };
149
150    (captured, stat)
151}
152
153/// Build an echo report from pre-captured data (for testing and native mode).
154pub fn build_report(
155    config: &EchoConfig,
156    topic_results: Vec<(Vec<CapturedMessage>, TopicStat)>,
157) -> EchoReport {
158    let mut all_messages = Vec::new();
159    let mut topic_stats = Vec::new();
160    let mut total_received: u64 = 0;
161    let mut total_captured: u64 = 0;
162
163    for (messages, stat) in topic_results {
164        total_received += stat.received;
165        total_captured += stat.captured;
166        topic_stats.push(stat);
167        all_messages.extend(messages);
168    }
169
170    EchoReport {
171        config: config.clone(),
172        messages: all_messages,
173        topic_stats,
174        total_received,
175        total_captured,
176    }
177}
178
179/// Format an echo report as human-readable text.
180pub fn format_report(report: &EchoReport) -> String {
181    let mut out = String::new();
182    out.push_str("Topic Echo Report\n");
183    out.push_str(&"=".repeat(50));
184    out.push('\n');
185
186    out.push_str(&format!(
187        "\nDecimation: 1/{}, Max per topic: {}\n",
188        report.config.decimation,
189        if report.config.max_messages == 0 {
190            "unlimited".to_string()
191        } else {
192            report.config.max_messages.to_string()
193        }
194    ));
195
196    out.push_str(&format!(
197        "Total received: {}, captured: {}\n",
198        report.total_received, report.total_captured
199    ));
200
201    out.push_str("\n## Per-topic stats\n");
202    for stat in &report.topic_stats {
203        out.push_str(&format!(
204            "  {}: {}/{} messages, {} bytes\n",
205            stat.topic, stat.captured, stat.received, stat.bytes_captured
206        ));
207    }
208
209    out.push_str("\n## Captured messages\n");
210    for msg in &report.messages {
211        out.push_str(&format!(
212            "  [{}] #{} on {} ({} bytes)\n",
213            msg.timestamp, msg.sequence, msg.topic, msg.size_bytes
214        ));
215    }
216
217    out
218}
219
220/// Parse CLI arguments into an EchoConfig.
221///
222/// Expected format: `topic1 [topic2 ...] [--decimation N] [--max N]`
223pub fn parse_args(args: &[String]) -> EchoConfig {
224    let mut config = EchoConfig::default();
225    let mut i = 0;
226
227    while i < args.len() {
228        match args[i].as_str() {
229            "--decimation" | "-d" => {
230                if i + 1 < args.len() {
231                    config.decimation = args[i + 1].parse().unwrap_or(1);
232                    i += 2;
233                } else {
234                    i += 1;
235                }
236            }
237            "--max" | "-n" => {
238                if i + 1 < args.len() {
239                    config.max_messages = args[i + 1].parse().unwrap_or(10);
240                    i += 2;
241                } else {
242                    i += 1;
243                }
244            }
245            topic => {
246                config.topics.push(topic.to_string());
247                i += 1;
248            }
249        }
250    }
251
252    config
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn decimate_no_reduction() {
261        let messages: Vec<Vec<u8>> = vec![vec![1, 2], vec![3, 4], vec![5, 6]];
262        let (captured, stat) = decimate("/topic", &messages, 1, 0, "2026-04-23T12:00:00Z");
263        assert_eq!(captured.len(), 3);
264        assert_eq!(stat.received, 3);
265        assert_eq!(stat.captured, 3);
266    }
267
268    #[test]
269    fn decimate_every_second() {
270        let messages: Vec<Vec<u8>> = (0..10).map(|i| vec![i]).collect();
271        let (captured, stat) = decimate("/cmd_vel", &messages, 2, 0, "2026-04-23T12:00:00Z");
272        assert_eq!(captured.len(), 5); // indices 0, 2, 4, 6, 8
273        assert_eq!(stat.received, 10);
274        assert_eq!(stat.captured, 5);
275        assert_eq!(captured[0].data, vec![0]);
276        assert_eq!(captured[1].data, vec![2]);
277    }
278
279    #[test]
280    fn decimate_with_max() {
281        let messages: Vec<Vec<u8>> = (0..20).map(|i| vec![i]).collect();
282        let (captured, stat) = decimate("/odom", &messages, 1, 5, "2026-04-23T12:00:00Z");
283        assert_eq!(captured.len(), 5);
284        assert_eq!(stat.received, 20);
285        assert_eq!(stat.captured, 5);
286    }
287
288    #[test]
289    fn decimate_empty_stream() {
290        let messages: Vec<Vec<u8>> = Vec::new();
291        let (captured, stat) = decimate("/empty", &messages, 1, 10, "2026-04-23T12:00:00Z");
292        assert!(captured.is_empty());
293        assert_eq!(stat.received, 0);
294        assert_eq!(stat.captured, 0);
295    }
296
297    #[test]
298    fn sequence_numbers_ascending() {
299        let messages: Vec<Vec<u8>> = vec![vec![1], vec![2], vec![3]];
300        let (captured, _) = decimate("/seq", &messages, 1, 0, "2026-04-23T12:00:00Z");
301        let seqs: Vec<u64> = captured.iter().map(|m| m.sequence).collect();
302        assert_eq!(seqs, vec![1, 2, 3]);
303    }
304
305    #[test]
306    fn build_report_aggregates() {
307        let config = EchoConfig {
308            topics: vec!["/a".into(), "/b".into()],
309            decimation: 1,
310            max_messages: 10,
311        };
312        let (msgs_a, stat_a) = decimate("/a", &[vec![1], vec![2]], 1, 10, "2026-04-23T12:00:00Z");
313        let (msgs_b, stat_b) = decimate(
314            "/b",
315            &[vec![3], vec![4], vec![5]],
316            1,
317            10,
318            "2026-04-23T12:00:00Z",
319        );
320
321        let report = build_report(&config, vec![(msgs_a, stat_a), (msgs_b, stat_b)]);
322        assert_eq!(report.total_received, 5);
323        assert_eq!(report.total_captured, 5);
324        assert_eq!(report.messages.len(), 5);
325        assert_eq!(report.topic_stats.len(), 2);
326    }
327
328    #[test]
329    fn parse_args_basic() {
330        let args: Vec<String> = vec!["/cmd_vel", "/odom", "--decimation", "5", "--max", "20"]
331            .into_iter()
332            .map(String::from)
333            .collect();
334        let config = parse_args(&args);
335        assert_eq!(config.topics, vec!["/cmd_vel", "/odom"]);
336        assert_eq!(config.decimation, 5);
337        assert_eq!(config.max_messages, 20);
338    }
339
340    #[test]
341    fn parse_args_defaults() {
342        let args: Vec<String> = vec!["/scan"].into_iter().map(String::from).collect();
343        let config = parse_args(&args);
344        assert_eq!(config.topics, vec!["/scan"]);
345        assert_eq!(config.decimation, 1);
346        assert_eq!(config.max_messages, 10);
347    }
348
349    #[test]
350    fn serialization_roundtrip() {
351        let msg = CapturedMessage {
352            topic: "/test".into(),
353            sequence: 1,
354            timestamp: "2026-04-23T12:00:00Z".into(),
355            data: vec![0xde, 0xad, 0xbe, 0xef],
356            size_bytes: 4,
357        };
358        let json = serde_json::to_string(&msg).unwrap();
359        let loaded: CapturedMessage = serde_json::from_str(&json).unwrap();
360        assert_eq!(msg, loaded);
361    }
362
363    #[test]
364    fn format_report_contains_sections() {
365        let config = EchoConfig {
366            topics: vec!["/scan".into()],
367            decimation: 2,
368            max_messages: 5,
369        };
370        let (msgs, stat) = decimate(
371            "/scan",
372            &[vec![1], vec![2], vec![3], vec![4]],
373            2,
374            5,
375            "2026-04-23T12:00:00Z",
376        );
377        let report = build_report(&config, vec![(msgs, stat)]);
378        let text = format_report(&report);
379        assert!(text.contains("Topic Echo Report"));
380        assert!(text.contains("1/2")); // decimation
381        assert!(text.contains("/scan"));
382        assert!(text.contains("Per-topic stats"));
383        assert!(text.contains("Captured messages"));
384    }
385
386    #[test]
387    fn bytes_captured_tracking() {
388        let messages: Vec<Vec<u8>> = vec![vec![1, 2, 3], vec![4, 5]];
389        let (_, stat) = decimate("/sized", &messages, 1, 0, "2026-04-23T12:00:00Z");
390        assert_eq!(stat.bytes_captured, 5); // 3 + 2
391    }
392}