pub mod xtask;
use std::collections::HashSet;
use std::env;
use std::io::{BufRead, BufReader};
use std::os::unix::process::CommandExt;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub enum LogOutput {
Off,
Stream,
WriteToDir(PathBuf),
Both(PathBuf),
}
pub struct TestProcesses {
children: Vec<Child>,
stdout_lines: Arc<Mutex<Vec<String>>>,
}
impl TestProcesses {
pub fn search_stdout_for_strings(&mut self, expected: HashSet<&str>) {
self.search_stdout_with_timeout(expected, Duration::from_secs(30));
}
pub fn search_stdout_with_timeout(&mut self, expected: HashSet<&str>, timeout: Duration) {
let start = Instant::now();
loop {
let missing: HashSet<&str> = {
let lines = self.stdout_lines.lock().unwrap();
expected
.iter()
.copied()
.filter(|needle| !lines.iter().any(|line| line.contains(needle)))
.collect()
};
if missing.is_empty() {
return;
}
if start.elapsed() >= timeout {
let lines = self.stdout_lines.lock().unwrap();
panic!(
"search_stdout_for_strings timed out after {:?}.\n\
Missing strings:\n{}\n\
Captured stdout ({} lines):\n{}",
timeout,
missing
.iter()
.map(|s| format!(" - {:?}", s))
.collect::<Vec<_>>()
.join("\n"),
lines.len(),
lines
.iter()
.map(|l| format!(" {l}"))
.collect::<Vec<_>>()
.join("\n"),
);
}
std::thread::sleep(Duration::from_millis(50));
}
}
pub fn debug_dump_stdout(&mut self, timeout: Duration) {
let start = Instant::now();
loop {
if start.elapsed() >= timeout {
return;
}
let lines = self.stdout_lines.lock().unwrap();
println!(
"Captured stdout:\n{}",
lines
.iter()
.map(|l| format!(" {l}"))
.collect::<Vec<_>>()
.join("\n"),
);
std::thread::sleep(Duration::from_millis(50));
}
}
pub fn kill_all(&mut self) {
for child in &mut self.children {
let _ = child.kill();
}
for child in &mut self.children {
let _ = child.wait();
}
}
}
impl Drop for TestProcesses {
fn drop(&mut self) {
self.kill_all();
}
}
fn pipe_labeled<R>(stream: R, label: &'static str)
where
R: std::io::Read + Send + 'static,
{
std::thread::spawn(move || {
use std::io::Write;
let mut out = std::fs::OpenOptions::new()
.write(true)
.open("/dev/stderr")
.expect("open /dev/stderr");
let reader = BufReader::new(stream);
for line in reader.lines() {
if let Ok(line) = line {
let _ = writeln!(out, "[{label}] {line}");
}
}
});
}
fn pipe_to_file<R>(stream: R, path: PathBuf)
where
R: std::io::Read + Send + 'static,
{
std::thread::spawn(move || {
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap_or_else(|e| panic!("pipe_to_file: could not open {}: {e}", path.display()));
let reader = BufReader::new(stream);
for line in reader.lines() {
if let Ok(line) = line {
let _ = writeln!(file, "{line}");
}
}
});
}
pub(crate) fn kill_stale_sim_processes(sim_id: &str) {
let patterns = [
format!("bs_2G4_phy_v1.*-s={sim_id}"),
format!("zephyr_rpc_server_app.*-s={sim_id}"),
format!("cgm_peripheral_sample.*-s={sim_id}"),
format!("socat.*{sim_id}.sock"),
];
for pat in &patterns {
let _ = Command::new("pkill").args(["-9", "-f", pat]).status();
}
std::thread::sleep(Duration::from_millis(300));
if let Ok(entries) = std::fs::read_dir("/tmp") {
for entry in entries.flatten() {
let name = entry.file_name();
if name.to_string_lossy().starts_with("bs_") {
let sim_dir = entry.path().join(sim_id);
if sim_dir.is_dir() {
let _ = std::fs::remove_dir_all(&sim_dir);
}
}
}
}
if let Ok(entries) = std::fs::read_dir("/dev/shm") {
for entry in entries.flatten() {
let name = entry.file_name();
if name.to_string_lossy().contains(sim_id) {
let _ = std::fs::remove_file(entry.path());
}
}
}
}
pub fn spawn_zephyr_rpc_server_with_socat(
tests_dir: &Path,
test_name: &str,
log: LogOutput,
) -> (TestProcesses, PathBuf) {
let verbose = matches!(log, LogOutput::Stream | LogOutput::Both(_));
let log_dir: Option<PathBuf> = match &log {
LogOutput::WriteToDir(p) | LogOutput::Both(p) => Some(p.clone()),
_ => None,
};
if let Some(ref dir) = log_dir {
std::fs::create_dir_all(dir)
.unwrap_or_else(|e| panic!("could not create log dir {}: {e}", dir.display()));
for name in &["phy.log", "rpc-server.log", "cgm.log"] {
std::fs::File::create(dir.join(name))
.unwrap_or_else(|e| panic!("could not create log file {name}: {e}"));
}
}
let bsim_bin = Path::new("external/tools/bsim/bin");
let bsim_out = "external/tools/bsim";
let bsim_comp = "external/tools/bsim/components";
let ld_path = match env::var("LD_LIBRARY_PATH") {
Ok(existing) => format!("external/tools/bsim/lib:{existing}"),
Err(_) => "external/tools/bsim/lib".to_string(),
};
let sim_id = test_name;
std::fs::create_dir_all(tests_dir)
.unwrap_or_else(|e| panic!("could not create tests dir {}: {e}", tests_dir.display()));
let socket_path = tests_dir.join(format!("{test_name}.sock"));
kill_stale_sim_processes(sim_id);
let _ = std::fs::remove_file(&socket_path);
let needs_phy_pipe = verbose || log_dir.is_some();
let mut phy = Command::new("./bs_2G4_phy_v1")
.args([
&format!("-s={sim_id}"),
"-D=2", "-sim_length=86400e6",
])
.current_dir(bsim_bin)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(if needs_phy_pipe { Stdio::piped() } else { Stdio::null() })
.env("BSIM_OUT_PATH", bsim_out)
.env("BSIM_COMPONENTS_PATH", bsim_comp)
.env("LD_LIBRARY_PATH", &ld_path)
.process_group(0)
.spawn()
.unwrap_or_else(|e| panic!("failed to spawn bs_2G4_phy_v1: {e}"));
if let Some(stderr) = phy.stderr.take() {
if verbose { pipe_labeled(stderr, "babblesim-phy"); }
else if let Some(ref dir) = log_dir { pipe_to_file(stderr, dir.join("phy.log")); }
}
let stdout_lines: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let (pty_tx, pty_rx) = std::sync::mpsc::channel::<PathBuf>();
let zephyr_color_arg: &[&str] = if verbose { &["-force-color"] } else { &[] };
let needs_zephyr_stderr = verbose || log_dir.is_some();
let mut zephyr_proc = Command::new("./zephyr_rpc_server_app")
.args([
&format!("-s={sim_id}"),
"-d=0",
"-uart0_pty",
"-uart_pty_pollT=1000",
])
.args(zephyr_color_arg)
.current_dir(bsim_bin)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(if needs_zephyr_stderr { Stdio::piped() } else { Stdio::null() })
.env("BSIM_OUT_PATH", bsim_out)
.env("BSIM_COMPONENTS_PATH", bsim_comp)
.env("LD_LIBRARY_PATH", &ld_path)
.process_group(0)
.spawn()
.unwrap_or_else(|e| panic!("failed to spawn zephyr_rpc_server_app: {e}"));
if let Some(stderr) = zephyr_proc.stderr.take() {
if verbose { pipe_labeled(stderr, "rpc-server"); }
else if let Some(ref dir) = log_dir { pipe_to_file(stderr, dir.join("rpc-server.log")); }
}
let zephyr_stdout = zephyr_proc.stdout.take().expect("stdout was piped");
let stdout_lines_clone = Arc::clone(&stdout_lines);
let rpc_log_path = log_dir.as_ref().map(|d| d.join("rpc-server.log"));
std::thread::spawn(move || {
use std::io::Write;
let mut real_stderr = verbose.then(|| {
std::fs::OpenOptions::new()
.write(true)
.open("/dev/stderr")
.expect("open /dev/stderr")
});
let mut log_file = rpc_log_path.as_ref().map(|p| {
std::fs::OpenOptions::new()
.append(true)
.open(p)
.unwrap_or_else(|e| panic!("could not open rpc-server.log: {e}"))
});
let reader = BufReader::new(zephyr_stdout);
let mut pty_sent = false;
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
if !pty_sent {
if let Some(idx) = line.find("connected to pseudotty: ") {
let pty_path_str = line[idx + "connected to pseudotty: ".len()..].trim();
let pty_path = PathBuf::from(pty_path_str);
let _ = pty_tx.send(pty_path);
pty_sent = true;
}
}
if let Some(ref mut out) = real_stderr {
let _ = writeln!(out, "[rpc-server] {line}");
}
if let Some(ref mut f) = log_file {
let _ = writeln!(f, "{line}");
}
stdout_lines_clone.lock().unwrap().push(line);
}
});
let mut cgm = if verbose || log_dir.is_some() {
Command::new("./cgm_peripheral_sample")
.args([&format!("-s={sim_id}"), "-d=1"])
.current_dir(bsim_bin)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("BSIM_OUT_PATH", bsim_out)
.env("BSIM_COMPONENTS_PATH", bsim_comp)
.env("LD_LIBRARY_PATH", &ld_path)
.process_group(0)
.spawn()
.unwrap_or_else(|e| panic!("failed to spawn cgm_peripheral_sample: {e}"))
} else {
let cgm_log_path = bsim_bin.join("cgm_peripheral_sample.log");
let cgm_log_file = std::fs::File::create(&cgm_log_path)
.unwrap_or_else(|e| panic!("could not create cgm log file: {e}"));
let cgm_log_clone = cgm_log_file
.try_clone()
.expect("could not clone cgm log file handle");
Command::new("./cgm_peripheral_sample")
.args([&format!("-s={sim_id}"), "-d=1"])
.current_dir(bsim_bin)
.stdin(Stdio::null())
.stdout(cgm_log_file)
.stderr(cgm_log_clone)
.env("BSIM_OUT_PATH", bsim_out)
.env("BSIM_COMPONENTS_PATH", bsim_comp)
.env("LD_LIBRARY_PATH", &ld_path)
.process_group(0)
.spawn()
.unwrap_or_else(|e| panic!("failed to spawn cgm_peripheral_sample: {e}"))
};
if let (Some(stdout), Some(stderr)) = (cgm.stdout.take(), cgm.stderr.take()) {
if verbose {
pipe_labeled(stdout, "cgm");
pipe_labeled(stderr, "cgm");
} else if let Some(ref dir) = log_dir {
pipe_to_file(stdout, dir.join("cgm.log"));
pipe_to_file(stderr, dir.join("cgm.log"));
}
}
let pty_path = pty_rx
.recv_timeout(Duration::from_secs(30))
.unwrap_or_else(|_| {
panic!(
"timed out waiting for zephyr_rpc_server_app to announce UART PTY path \
(expected a stdout line containing \"connected to pseudotty: \")"
)
});
let socket_path_str = socket_path
.to_str()
.expect("socket path must be valid UTF-8");
let pty_path_str = pty_path
.to_str()
.expect("PTY path must be valid UTF-8");
let socat = Command::new("socat")
.arg(format!("UNIX-LISTEN:{socket_path_str},fork"))
.arg(format!("{pty_path_str},raw,echo=0"))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.process_group(0)
.spawn()
.unwrap_or_else(|e| {
panic!(
"failed to spawn socat (is it installed?): {e}\n\
socat bridges the Zephyr UART PTY ({pty_path_str}) to the test UNIX socket \
({socket_path_str})"
)
});
let processes = TestProcesses {
children: vec![phy, zephyr_proc, cgm, socat],
stdout_lines,
};
(processes, socket_path)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_tp(lines: Vec<&str>) -> TestProcesses {
let buf = Arc::new(Mutex::new(
lines.into_iter().map(str::to_owned).collect(),
));
TestProcesses {
children: vec![],
stdout_lines: buf,
}
}
#[test]
fn parses_pty_path_from_typical_stdout_line() {
let line = "UART_0 connected to pseudotty: /dev/pts/5";
let needle = "connected to pseudotty: ";
let idx = line.find(needle).expect("needle present");
let path = line[idx + needle.len()..].trim();
assert_eq!(path, "/dev/pts/5");
}
#[test]
fn parses_pty_path_ignores_leading_whitespace() {
let line = " UARTE_1 connected to pseudotty: /dev/pts/12 ";
let needle = "connected to pseudotty:";
let idx = line.find(needle).expect("needle present");
let path = line[idx + needle.len()..].trim();
assert_eq!(path, "/dev/pts/12");
}
#[test]
fn search_finds_exact_line_match() {
let mut tp = make_tp(vec!["<inf> nrf_ps_server: Initializing RPC server"]);
tp.search_stdout_with_timeout(
HashSet::from(["Initializing RPC server"]),
Duration::from_millis(500),
);
}
#[test]
fn search_finds_multiple_strings_across_different_lines() {
let mut tp = make_tp(vec![
"<inf> nrf_ps_server: Initializing RPC server",
"<dbg> NRF_RPC: Done initializing nRF RPC module",
"some other log line",
]);
tp.search_stdout_with_timeout(
HashSet::from([
"Initializing RPC server",
"Done initializing nRF RPC module",
]),
Duration::from_millis(500),
);
}
#[test]
fn search_succeeds_on_empty_expected_set() {
let mut tp = make_tp(vec![]);
tp.search_stdout_with_timeout(HashSet::new(), Duration::from_millis(100));
}
#[test]
#[should_panic(expected = "timed out")]
fn search_panics_when_string_is_absent() {
let mut tp = make_tp(vec!["something irrelevant"]);
tp.search_stdout_with_timeout(
HashSet::from(["this string is not present"]),
Duration::from_millis(200),
);
}
#[test]
#[should_panic(expected = "timed out")]
fn search_panics_when_only_some_strings_are_found() {
let mut tp = make_tp(vec!["line A present"]);
tp.search_stdout_with_timeout(
HashSet::from(["line A present", "line B missing"]),
Duration::from_millis(200),
);
}
#[test]
fn kill_all_on_empty_children_does_not_panic() {
let mut tp = make_tp(vec![]);
tp.kill_all(); }
#[test]
fn log_output_off_is_not_verbose() {
let verbose = matches!(LogOutput::Off, LogOutput::Stream | LogOutput::Both(_));
assert!(!verbose);
}
#[test]
fn log_output_write_to_dir_is_not_verbose() {
let verbose = matches!(
LogOutput::WriteToDir(PathBuf::from("/tmp")),
LogOutput::Stream | LogOutput::Both(_)
);
assert!(!verbose);
}
#[test]
fn log_output_stream_is_verbose() {
let verbose = matches!(LogOutput::Stream, LogOutput::Stream | LogOutput::Both(_));
assert!(verbose);
}
#[test]
fn log_output_both_is_verbose() {
let verbose = matches!(
LogOutput::Both(PathBuf::from("/tmp")),
LogOutput::Stream | LogOutput::Both(_)
);
assert!(verbose);
}
#[test]
fn log_output_off_has_no_log_dir() {
let log_dir: Option<PathBuf> = match &LogOutput::Off {
LogOutput::WriteToDir(p) | LogOutput::Both(p) => Some(p.clone()),
_ => None,
};
assert!(log_dir.is_none());
}
#[test]
fn log_output_write_to_dir_extracts_path() {
let expected = PathBuf::from("/tmp/sim-logs");
let log_dir: Option<PathBuf> = match &LogOutput::WriteToDir(expected.clone()) {
LogOutput::WriteToDir(p) | LogOutput::Both(p) => Some(p.clone()),
_ => None,
};
assert_eq!(log_dir, Some(expected));
}
#[test]
fn log_output_both_extracts_path() {
let expected = PathBuf::from("/tmp/sim-logs");
let log_dir: Option<PathBuf> = match &LogOutput::Both(expected.clone()) {
LogOutput::WriteToDir(p) | LogOutput::Both(p) => Some(p.clone()),
_ => None,
};
assert_eq!(log_dir, Some(expected));
}
#[test]
fn pipe_to_file_writes_lines_to_file() {
use std::io::Cursor;
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("out.log");
std::fs::File::create(&path).unwrap();
let content = b"line one\nline two\nline three\n";
pipe_to_file(Cursor::new(content), path.clone());
std::thread::sleep(Duration::from_millis(200));
let written = std::fs::read_to_string(&path).unwrap();
assert!(written.contains("line one"), "missing 'line one' in {written:?}");
assert!(written.contains("line two"), "missing 'line two' in {written:?}");
assert!(written.contains("line three"), "missing 'line three' in {written:?}");
}
#[test]
fn file_create_truncates_existing_content() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("stale.log");
std::fs::write(&path, "old sentinel content\n").unwrap();
std::fs::File::create(&path).unwrap();
let after = std::fs::read_to_string(&path).unwrap();
assert!(after.is_empty(), "file should be empty after File::create, got {after:?}");
}
}