Skip to main content

babble_bridge/
lib.rs

1//! BabbleSim + Zephyr nRF RPC simulation bridge.
2//!
3//! This crate provides three things:
4//!
5//! - **Test harness** ([`spawn_zephyr_rpc_server_with_socat`]) — spawn a full
6//!   BabbleSim simulation from Rust integration tests.
7//! - **xtask CLI** ([`xtask::cli_main`]) — docker, zephyr-setup, and run-bsim
8//!   commands that downstream crates can re-export.
9//! - **Programmatic setup API** ([`xtask::fetch_prebuilt_binaries`],
10//!   [`xtask::zephyr_setup`]) — call from a downstream `build.rs` or any
11//!   Rust code without shelling out.
12//!
13//! # Test harness usage
14//!
15//! ```no_run
16//! use std::collections::HashSet;
17//! use std::path::Path;
18//!
19//! let tests_dir = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sockets"));
20//! let (mut processes, socket_path) =
21//!     babble_bridge::spawn_zephyr_rpc_server_with_socat(tests_dir, "my_test");
22//!
23//! // … run test logic, write/read via a UnixStream to socket_path …
24//!
25//! processes.search_stdout_for_strings(HashSet::from([
26//!     "<inf> nrf_ps_server: Initializing RPC server",
27//! ]));
28//! ```
29
30pub mod xtask;
31
32use std::collections::HashSet;
33use std::env;
34use std::io::{BufRead, BufReader};
35use std::os::unix::process::CommandExt;
36use std::path::{Path, PathBuf};
37use std::process::{Child, Command, Stdio};
38use std::sync::{Arc, Mutex};
39use std::time::{Duration, Instant};
40
41// ── Public types ─────────────────────────────────────────────────────────────
42
43/// Owns all child processes spawned for a single simulation run and
44/// accumulates their stdout output for later inspection.
45///
46/// All child processes are killed when this value is dropped.
47pub struct TestProcesses {
48    children: Vec<Child>,
49    /// Combined stdout lines from every process whose stdout was captured.
50    stdout_lines: Arc<Mutex<Vec<String>>>,
51}
52
53impl TestProcesses {
54    /// Block until every string in `expected` appears as a substring of any
55    /// accumulated stdout line, or panic after 30 seconds listing missing strings.
56    pub fn search_stdout_for_strings(&mut self, expected: HashSet<&str>) {
57        self.search_stdout_with_timeout(expected, Duration::from_secs(30));
58    }
59
60    /// Like [`search_stdout_for_strings`] but with a caller-supplied timeout.
61    /// Useful in tests to avoid 30-second waits.
62    pub fn search_stdout_with_timeout(&mut self, expected: HashSet<&str>, timeout: Duration) {
63        let start = Instant::now();
64
65        loop {
66            let missing: HashSet<&str> = {
67                let lines = self.stdout_lines.lock().unwrap();
68                expected
69                    .iter()
70                    .copied()
71                    .filter(|needle| !lines.iter().any(|line| line.contains(needle)))
72                    .collect()
73            };
74
75            if missing.is_empty() {
76                return;
77            }
78
79            if start.elapsed() >= timeout {
80                let lines = self.stdout_lines.lock().unwrap();
81                panic!(
82                    "search_stdout_for_strings timed out after {:?}.\n\
83                     Missing strings:\n{}\n\
84                     Captured stdout ({} lines):\n{}",
85                    timeout,
86                    missing
87                        .iter()
88                        .map(|s| format!("  - {:?}", s))
89                        .collect::<Vec<_>>()
90                        .join("\n"),
91                    lines.len(),
92                    lines
93                        .iter()
94                        .map(|l| format!("  {l}"))
95                        .collect::<Vec<_>>()
96                        .join("\n"),
97                );
98            }
99
100            std::thread::sleep(Duration::from_millis(50));
101        }
102    }
103    
104    /// Helper method to dump the current stdout from attached nrf-rpc-server.
105    /// Useful when debugging, but will result in search stdout methods no longer
106    /// functioning (as this will consume stdout).
107    pub fn debug_dump_stdout(&mut self, timeout: Duration) {
108        let start = Instant::now();
109
110        loop {
111            if start.elapsed() >= timeout {
112                return;
113            } 
114            
115            let lines = self.stdout_lines.lock().unwrap();
116            println!(
117                "Captured stdout:\n{}",
118                lines
119                    .iter()
120                    .map(|l| format!("  {l}"))
121                    .collect::<Vec<_>>()
122                    .join("\n"),
123            );
124
125            std::thread::sleep(Duration::from_millis(50));
126        }
127    }
128
129    /// Kill all managed child processes immediately. Called automatically on drop.
130    pub fn kill_all(&mut self) {
131        for child in &mut self.children {
132            let _ = child.kill();
133        }
134        for child in &mut self.children {
135            let _ = child.wait();
136        }
137    }
138}
139
140impl Drop for TestProcesses {
141    fn drop(&mut self) {
142        self.kill_all();
143    }
144}
145
146// ── Internal helpers ─────────────────────────────────────────────────────────
147
148/// Spawn a background thread that drains `stream` line by line and writes
149/// each line to the **real** stderr (fd 2 via `/dev/stderr`) as
150/// `[<label>] <line>`.
151///
152/// We open `/dev/stderr` directly instead of using `eprintln!` so the output
153/// reaches the terminal even when `cargo test` has redirected
154/// `std::io::stderr()` to its per-test capture buffer (which suppresses
155/// passing-test output unless `--nocapture` is passed).
156fn pipe_labeled<R>(stream: R, label: &'static str)
157where
158    R: std::io::Read + Send + 'static,
159{
160    std::thread::spawn(move || {
161        use std::io::Write;
162        let mut out = std::fs::OpenOptions::new()
163            .write(true)
164            .open("/dev/stderr")
165            .expect("open /dev/stderr");
166        let reader = BufReader::new(stream);
167        for line in reader.lines() {
168            if let Ok(line) = line {
169                let _ = writeln!(out, "[{label}] {line}");
170            }
171        }
172    });
173}
174
175// ── Public function ───────────────────────────────────────────────────────────
176
177/// Spawns the full BabbleSim simulation stack for a single test:
178///
179/// 1. `bs_2G4_phy_v1`  — the radio PHY simulator
180/// 2. `zephyr_rpc_server_app` — Zephyr nRF RPC server with `-uart0_pty`
181/// 3. `cgm_peripheral_sample` — CGM BLE peripheral
182///
183/// The function waits up to 10 seconds for `zephyr_rpc_server_app` to print
184/// its PTY path on stdout (`"UART_0 connected to pseudotty: /dev/pts/N"`),
185/// then launches `socat` to bridge that PTY to a UNIX socket at
186/// Kills any leftover BabbleSim processes from a previous run with the given
187/// `sim_id`. Debugger stops and abnormal exits leave orphaned child processes
188/// that hold the sim_id and block the next launch.
189pub(crate) fn kill_stale_sim_processes(sim_id: &str) {
190    let patterns = [
191        format!("bs_2G4_phy_v1.*-s={sim_id}"),
192        format!("zephyr_rpc_server_app.*-s={sim_id}"),
193        format!("cgm_peripheral_sample.*-s={sim_id}"),
194        format!("socat.*{sim_id}.sock"),
195    ];
196    for pat in &patterns {
197        let _ = Command::new("pkill").args(["-9", "-f", pat]).status();
198    }
199    // Give processes time to fully exit.
200    std::thread::sleep(Duration::from_millis(300));
201
202    // BabbleSim stores per-sim IPC files under /tmp/bs_<username>/<sim_id>/.
203    // These lock/pipe files must be removed before a new run or the PHY will
204    // hang waiting for coordination on stale file descriptors.
205    if let Ok(entries) = std::fs::read_dir("/tmp") {
206        for entry in entries.flatten() {
207            let name = entry.file_name();
208            if name.to_string_lossy().starts_with("bs_") {
209                let sim_dir = entry.path().join(sim_id);
210                if sim_dir.is_dir() {
211                    let _ = std::fs::remove_dir_all(&sim_dir);
212                }
213            }
214        }
215    }
216
217    // Also clean up any POSIX shared memory objects keyed by sim_id.
218    if let Ok(entries) = std::fs::read_dir("/dev/shm") {
219        for entry in entries.flatten() {
220            let name = entry.file_name();
221            if name.to_string_lossy().contains(sim_id) {
222                let _ = std::fs::remove_file(entry.path());
223            }
224        }
225    }
226}
227
228/// `tests_dir/<test_name>.sock`.
229///
230/// # Panics
231///
232/// Panics if any process fails to spawn, if PTY discovery times out, or if
233/// `socat` is not found on `PATH`.
234pub fn spawn_zephyr_rpc_server_with_socat(
235    tests_dir: &Path,
236    test_name: &str,
237) -> (TestProcesses, PathBuf) {
238    // When the `sim-log` feature is enabled, each process's output is forwarded
239    // to the caller's stderr with a labelled prefix so it appears in the
240    // terminal even under `cargo test` (which captures stdout but not stderr).
241    // Usage:
242    //
243    //   cargo test --features sim-log --test sim_harness
244    //
245    // Downstream crates add this to their dev-dependency:
246    //   babble-bridge = { ..., features = ["sim-log"] }
247    let verbose = cfg!(feature = "sim-log");
248
249    let bsim_bin = Path::new("external/tools/bsim/bin");
250    let bsim_out = "external/tools/bsim";
251    let bsim_comp = "external/tools/bsim/components";
252    let ld_path = match env::var("LD_LIBRARY_PATH") {
253        Ok(existing) => format!("external/tools/bsim/lib:{existing}"),
254        Err(_) => "external/tools/bsim/lib".to_string(),
255    };
256
257    let sim_id = test_name;
258
259    std::fs::create_dir_all(tests_dir)
260        .unwrap_or_else(|e| panic!("could not create tests dir {}: {e}", tests_dir.display()));
261    let socket_path = tests_dir.join(format!("{test_name}.sock"));
262
263    // Kill orphaned processes FIRST so socat releases its fd on the socket
264    // file before we unlink it.  Without this ordering, remove_file succeeds
265    // on the directory entry but socat keeps an open fd on the inode, and the
266    // new socat fails to bind if the socket is still in use.
267    kill_stale_sim_processes(sim_id);
268    let _ = std::fs::remove_file(&socket_path);
269
270    // ── 1. PHY ──────────────────────────────────────────────────────────────
271    let mut phy = Command::new("./bs_2G4_phy_v1")
272        .args([
273            &format!("-s={sim_id}"),
274            "-D=2", // 2 radio devices: zephyr_rpc_server_app (d=0) + cgm_peripheral_sample (d=1)
275            "-sim_length=86400e6",
276        ])
277        .current_dir(bsim_bin)
278        .stdin(Stdio::null())
279        .stdout(Stdio::null())
280        .stderr(if verbose { Stdio::piped() } else { Stdio::null() })
281        .env("BSIM_OUT_PATH", bsim_out)
282        .env("BSIM_COMPONENTS_PATH", bsim_comp)
283        .env("LD_LIBRARY_PATH", &ld_path)
284        .process_group(0)
285        .spawn()
286        .unwrap_or_else(|e| panic!("failed to spawn bs_2G4_phy_v1: {e}"));
287    if verbose {
288        if let Some(s) = phy.stderr.take() { pipe_labeled(s, "babblesim-phy"); }
289    }
290
291    // ── 2. Zephyr RPC server (stdout always piped for PTY discovery + log capture) ──
292    //
293    // stdout must stay piped regardless of verbose mode so the PTY path can
294    // be extracted.  When verbose, the reader thread additionally forwards
295    // every line to stderr with a "[zephyr]" prefix.
296    let stdout_lines: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
297    let (pty_tx, pty_rx) = std::sync::mpsc::channel::<PathBuf>();
298
299    // -force-color tells the Zephyr native-sim tracing layer to emit ANSI
300    // escape codes even when stdout/stderr are pipes rather than a real TTY.
301    // Without it, isatty() returns 0 on a pipe and colors are stripped.
302    let zephyr_color_arg: &[&str] = if verbose { &["-force-color"] } else { &[] };
303
304    let mut zephyr_proc = Command::new("./zephyr_rpc_server_app")
305        .args([
306            &format!("-s={sim_id}"),
307            "-d=0",
308            "-uart0_pty",
309            "-uart_pty_pollT=1000",
310        ])
311        .args(zephyr_color_arg)
312        .current_dir(bsim_bin)
313        .stdin(Stdio::null())
314        .stdout(Stdio::piped())
315        .stderr(if verbose { Stdio::piped() } else { Stdio::null() })
316        .env("BSIM_OUT_PATH", bsim_out)
317        .env("BSIM_COMPONENTS_PATH", bsim_comp)
318        .env("LD_LIBRARY_PATH", &ld_path)
319        .process_group(0)
320        .spawn()
321        .unwrap_or_else(|e| panic!("failed to spawn zephyr_rpc_server_app: {e}"));
322
323    // Drain Zephyr stderr (kernel/driver logs) with label when verbose.
324    if verbose {
325        if let Some(s) = zephyr_proc.stderr.take() { pipe_labeled(s, "rpc-server"); }
326    }
327
328    // Drain Zephyr stdout in a background thread:
329    // - send the PTY path once via `pty_tx` when the "pseudotty" line appears
330    // - append every line to the shared `stdout_lines` buffer
331    // - when verbose, also forward each line to stderr with a "[rpc-server]" prefix
332    let zephyr_stdout = zephyr_proc.stdout.take().expect("stdout was piped");
333    let stdout_lines_clone = Arc::clone(&stdout_lines);
334    std::thread::spawn(move || {
335        use std::io::Write;
336        // Same /dev/stderr trick as pipe_labeled — bypasses cargo test capture.
337        let mut real_stderr = verbose.then(|| {
338            std::fs::OpenOptions::new()
339                .write(true)
340                .open("/dev/stderr")
341                .expect("open /dev/stderr")
342        });
343        let reader = BufReader::new(zephyr_stdout);
344        let mut pty_sent = false;
345        for line in reader.lines() {
346            let line = match line {
347                Ok(l) => l,
348                Err(_) => break,
349            };
350            // PTY discovery: nsi_print_trace writes to stdout
351            // format: "<uart_name> connected to pseudotty: <slave_path>"
352            if !pty_sent {
353                if let Some(idx) = line.find("connected to pseudotty: ") {
354                    let pty_path_str = line[idx + "connected to pseudotty: ".len()..].trim();
355                    let pty_path = PathBuf::from(pty_path_str);
356                    let _ = pty_tx.send(pty_path);
357                    pty_sent = true;
358                }
359            }
360            if let Some(ref mut out) = real_stderr {
361                let _ = writeln!(out, "[rpc-server] {line}");
362            }
363            stdout_lines_clone.lock().unwrap().push(line);
364        }
365    });
366
367    // ── 3. CGM peripheral ────────────────────────────────────────────────────
368    let mut cgm = if verbose {
369        Command::new("./cgm_peripheral_sample")
370            .args([&format!("-s={sim_id}"), "-d=1"])
371            .current_dir(bsim_bin)
372            .stdin(Stdio::null())
373            .stdout(Stdio::piped())
374            .stderr(Stdio::piped())
375            .env("BSIM_OUT_PATH", bsim_out)
376            .env("BSIM_COMPONENTS_PATH", bsim_comp)
377            .env("LD_LIBRARY_PATH", &ld_path)
378            .process_group(0)
379            .spawn()
380            .unwrap_or_else(|e| panic!("failed to spawn cgm_peripheral_sample: {e}"))
381    } else {
382        let cgm_log_path = bsim_bin.join("cgm_peripheral_sample.log");
383        let cgm_log_file = std::fs::File::create(&cgm_log_path)
384            .unwrap_or_else(|e| panic!("could not create cgm log file: {e}"));
385        let cgm_log_clone = cgm_log_file
386            .try_clone()
387            .expect("could not clone cgm log file handle");
388        Command::new("./cgm_peripheral_sample")
389            .args([&format!("-s={sim_id}"), "-d=1"])
390            .current_dir(bsim_bin)
391            .stdin(Stdio::null())
392            .stdout(cgm_log_file)
393            .stderr(cgm_log_clone)
394            .env("BSIM_OUT_PATH", bsim_out)
395            .env("BSIM_COMPONENTS_PATH", bsim_comp)
396            .env("LD_LIBRARY_PATH", &ld_path)
397            .process_group(0)
398            .spawn()
399            .unwrap_or_else(|e| panic!("failed to spawn cgm_peripheral_sample: {e}"))
400    };
401    if verbose {
402        if let Some(s) = cgm.stdout.take() { pipe_labeled(s, "cgm"); }
403        if let Some(s) = cgm.stderr.take() { pipe_labeled(s, "cgm"); }
404    }
405
406    // ── 4. Wait for PTY path ─────────────────────────────────────────────────
407    let pty_path = pty_rx
408        .recv_timeout(Duration::from_secs(30))
409        .unwrap_or_else(|_| {
410            panic!(
411                "timed out waiting for zephyr_rpc_server_app to announce UART PTY path \
412                 (expected a stdout line containing \"connected to pseudotty: \")"
413            )
414        });
415
416    // ── 5. socat bridge: PTY → UNIX socket ───────────────────────────────────
417    let socket_path_str = socket_path
418        .to_str()
419        .expect("socket path must be valid UTF-8");
420    let pty_path_str = pty_path
421        .to_str()
422        .expect("PTY path must be valid UTF-8");
423
424    let socat = Command::new("socat")
425        .arg(format!("UNIX-LISTEN:{socket_path_str},fork"))
426        .arg(format!("{pty_path_str},raw,echo=0"))
427        .stdin(Stdio::null())
428        .stdout(Stdio::null())
429        .stderr(Stdio::null())
430        .process_group(0)
431        .spawn()
432        .unwrap_or_else(|e| {
433            panic!(
434                "failed to spawn socat (is it installed?): {e}\n\
435                 socat bridges the Zephyr UART PTY ({pty_path_str}) to the test UNIX socket \
436                 ({socket_path_str})"
437            )
438        });
439
440    let processes = TestProcesses {
441        children: vec![phy, zephyr_proc, cgm, socat],
442        stdout_lines,
443    };
444
445    (processes, socket_path)
446}
447
448// ── Unit tests ────────────────────────────────────────────────────────────────
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    // Helper: build a TestProcesses with a pre-filled stdout buffer and no
455    // real child processes.
456    fn make_tp(lines: Vec<&str>) -> TestProcesses {
457        let buf = Arc::new(Mutex::new(
458            lines.into_iter().map(str::to_owned).collect(),
459        ));
460        TestProcesses {
461            children: vec![],
462            stdout_lines: buf,
463        }
464    }
465
466    // ── PTY path parsing ──────────────────────────────────────────────────────
467
468    #[test]
469    fn parses_pty_path_from_typical_stdout_line() {
470        let line = "UART_0 connected to pseudotty: /dev/pts/5";
471        let needle = "connected to pseudotty: ";
472        let idx = line.find(needle).expect("needle present");
473        let path = line[idx + needle.len()..].trim();
474        assert_eq!(path, "/dev/pts/5");
475    }
476
477    #[test]
478    fn parses_pty_path_ignores_leading_whitespace() {
479        let line = "  UARTE_1 connected to pseudotty:  /dev/pts/12  ";
480        let needle = "connected to pseudotty:";
481        let idx = line.find(needle).expect("needle present");
482        let path = line[idx + needle.len()..].trim();
483        assert_eq!(path, "/dev/pts/12");
484    }
485
486    // ── search_stdout_with_timeout ────────────────────────────────────────────
487
488    #[test]
489    fn search_finds_exact_line_match() {
490        let mut tp = make_tp(vec!["<inf> nrf_ps_server: Initializing RPC server"]);
491        // Must not panic.
492        tp.search_stdout_with_timeout(
493            HashSet::from(["Initializing RPC server"]),
494            Duration::from_millis(500),
495        );
496    }
497
498    #[test]
499    fn search_finds_multiple_strings_across_different_lines() {
500        let mut tp = make_tp(vec![
501            "<inf> nrf_ps_server: Initializing RPC server",
502            "<dbg> NRF_RPC: Done initializing nRF RPC module",
503            "some other log line",
504        ]);
505        tp.search_stdout_with_timeout(
506            HashSet::from([
507                "Initializing RPC server",
508                "Done initializing nRF RPC module",
509            ]),
510            Duration::from_millis(500),
511        );
512    }
513
514    #[test]
515    fn search_succeeds_on_empty_expected_set() {
516        let mut tp = make_tp(vec![]);
517        // Empty set → nothing to wait for → should return immediately.
518        tp.search_stdout_with_timeout(HashSet::new(), Duration::from_millis(100));
519    }
520
521    #[test]
522    #[should_panic(expected = "timed out")]
523    fn search_panics_when_string_is_absent() {
524        let mut tp = make_tp(vec!["something irrelevant"]);
525        tp.search_stdout_with_timeout(
526            HashSet::from(["this string is not present"]),
527            Duration::from_millis(200),
528        );
529    }
530
531    #[test]
532    #[should_panic(expected = "timed out")]
533    fn search_panics_when_only_some_strings_are_found() {
534        let mut tp = make_tp(vec!["line A present"]);
535        tp.search_stdout_with_timeout(
536            HashSet::from(["line A present", "line B missing"]),
537            Duration::from_millis(200),
538        );
539    }
540
541    // ── kill_all is a no-op on an empty children list ─────────────────────────
542
543    #[test]
544    fn kill_all_on_empty_children_does_not_panic() {
545        let mut tp = make_tp(vec![]);
546        tp.kill_all(); // should be a silent no-op
547    }
548}