use babble_bridge::TestProcesses;
use std::collections::HashSet;
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, Once};
use std::time::Duration;
struct SimUart {
socket: UnixStream,
rx_buffer: Arc<Mutex<Vec<u8>>>,
}
impl SimUart {
fn connect(socket_path: &Path) -> Self {
let start = std::time::Instant::now();
let socket = loop {
match UnixStream::connect(socket_path) {
Ok(s) => break s,
Err(e) if start.elapsed() < Duration::from_secs(5) => {
std::thread::sleep(Duration::from_millis(50));
let _ = e;
}
Err(e) => panic!(
"could not connect to {} within 5 s: {e}",
socket_path.display()
),
}
};
socket.set_nonblocking(false).expect("set blocking");
let rx_buffer: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
let rx_clone = Arc::clone(&rx_buffer);
let mut rx_socket = socket.try_clone().expect("clone socket for RX thread");
std::thread::spawn(move || {
let mut buf = [0u8; 1024];
loop {
match rx_socket.read(&mut buf) {
Ok(0) => break, Ok(n) => {
println!("SimUart RX: {} bytes: {:02X?}", n, &buf[..n]);
rx_clone.lock().unwrap().extend_from_slice(&buf[..n]);
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
println!("SimUart RX error: {e}");
break;
}
}
}
});
Self { socket, rx_buffer }
}
fn send(&mut self, data: &[u8]) {
println!("SimUart TX: {} bytes: {:02X?}", data.len(), data);
self.socket.write_all(data).expect("socket write");
self.socket.flush().expect("socket flush");
}
fn recv_frame(&self, timeout: Duration) -> Vec<u8> {
const HDLC: u8 = 0x7E;
let deadline = std::time::Instant::now() + timeout;
loop {
{
let rx = self.rx_buffer.lock().unwrap();
let mut delimiters = 0u32;
for (i, &b) in rx.iter().enumerate() {
if b == HDLC {
delimiters += 1;
if delimiters >= 2 {
let frame = rx[..=i].to_vec();
drop(rx);
self.rx_buffer.lock().unwrap().drain(..=i);
println!("SimUart frame: {:02X?}", &frame);
return frame;
}
}
}
}
if std::time::Instant::now() >= deadline {
println!("SimUart: recv_frame timed out");
return Vec::new();
}
std::thread::yield_now();
}
}
}
fn once_cleanup_sockets() {
static CLEANUP: Once = Once::new();
CLEANUP.call_once(|| {
let sockets_dir = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sockets"));
if let Ok(entries) = std::fs::read_dir(sockets_dir) {
for entry in entries.flatten() {
if entry.path().extension().map_or(false, |e| e == "sock") {
let _ = std::fs::remove_file(entry.path());
}
}
}
});
}
fn start_sim(test_name: &str) -> (TestProcesses, SimUart) {
once_cleanup_sockets();
let sockets_dir = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sockets"));
let log = if cfg!(feature = "sim-log") {
babble_bridge::LogOutput::Stream
} else {
babble_bridge::LogOutput::Off
};
let (processes, socket_path) =
babble_bridge::spawn_zephyr_rpc_server_with_socat(sockets_dir, test_name, log);
let uart = SimUart::connect(&socket_path);
(processes, uart)
}
#[test]
fn spawns_socket_and_zephyr_initializes() {
let (mut processes, _uart) = start_sim("spawns_socket_and_zephyr_initializes");
processes.search_stdout_for_strings(HashSet::from([
"<inf> nrf_ps_server: Initializing RPC server",
]));
}
#[test]
fn client_can_connect_to_socket() {
let (_processes, mut uart) = start_sim("client_can_connect_to_socket");
uart.send(b"\x00");
}
#[test]
fn downstream_usage_example() {
let (mut processes, mut uart) = start_sim("downstream_usage_example");
processes.search_stdout_for_strings(HashSet::from([
"<inf> nrf_ps_server: Initializing RPC server",
]));
println!("[Step 2] Zephyr RPC server ready.");
uart.send(&[0x7E, 0x00, 0x7E]);
println!("[Step 3] Sent HDLC frame.");
let response = uart.recv_frame(Duration::from_secs(3));
if !response.is_empty() {
println!("[Step 4] Received response frame: {:02X?}", response);
} else {
println!("[Step 4] No response frame within timeout (expected for invalid payload).");
}
processes.search_stdout_for_strings(HashSet::from([
"<dbg> NRF_RPC: Done initializing nRF RPC module",
]));
println!("[Step 5] Server-side logs verified.");
}
#[test]
fn log_output_write_to_dir_creates_log_files() {
let log_dir = tempfile::tempdir().expect("tempdir");
let sockets_dir = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sockets"));
let log = babble_bridge::LogOutput::WriteToDir(log_dir.path().to_path_buf());
let (mut processes, _socket_path) =
babble_bridge::spawn_zephyr_rpc_server_with_socat(
sockets_dir,
"log_output_write_to_dir_creates_log_files",
log,
);
processes.search_stdout_for_strings(HashSet::from([
"<inf> nrf_ps_server: Initializing RPC server",
]));
std::thread::sleep(Duration::from_millis(300));
let rpc_log = log_dir.path().join("rpc-server.log");
let cgm_log = log_dir.path().join("cgm.log");
let phy_log = log_dir.path().join("phy.log");
assert!(rpc_log.exists(), "rpc-server.log not created");
assert!(cgm_log.exists(), "cgm.log not created");
assert!(phy_log.exists(), "phy.log not created");
let rpc_contents = std::fs::read_to_string(&rpc_log).unwrap();
assert!(
!rpc_contents.is_empty(),
"rpc-server.log is empty — Zephyr stdout was not forwarded"
);
}
#[test]
fn log_output_write_to_dir_clears_logs_on_respawn() {
let log_dir = tempfile::tempdir().expect("tempdir");
let sockets_dir = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sockets"));
{
let log = babble_bridge::LogOutput::WriteToDir(log_dir.path().to_path_buf());
let (_processes, _) = babble_bridge::spawn_zephyr_rpc_server_with_socat(
sockets_dir,
"log_output_clears_logs_respawn",
log,
);
}
let rpc_log = log_dir.path().join("rpc-server.log");
std::fs::write(&rpc_log, "STALE_SENTINEL_FROM_PREVIOUS_RUN\n").unwrap();
{
let log = babble_bridge::LogOutput::WriteToDir(log_dir.path().to_path_buf());
let (mut processes, _) = babble_bridge::spawn_zephyr_rpc_server_with_socat(
sockets_dir,
"log_output_clears_logs_respawn",
log,
);
processes.search_stdout_for_strings(HashSet::from([
"<inf> nrf_ps_server: Initializing RPC server",
]));
}
let contents = std::fs::read_to_string(&rpc_log).unwrap();
assert!(
!contents.contains("STALE_SENTINEL_FROM_PREVIOUS_RUN"),
"sentinel still present after respawn — log was not cleared:\n{contents}"
);
}
#[test]
fn log_output_off_creates_no_log_files() {
let log_dir = tempfile::tempdir().expect("tempdir");
let sockets_dir = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/sockets"));
let (mut processes, _) = babble_bridge::spawn_zephyr_rpc_server_with_socat(
sockets_dir,
"log_output_off_creates_no_log_files",
babble_bridge::LogOutput::Off,
);
processes.search_stdout_for_strings(HashSet::from([
"<inf> nrf_ps_server: Initializing RPC server",
]));
let log_files: Vec<_> = std::fs::read_dir(log_dir.path())
.unwrap()
.flatten()
.filter(|e| e.path().extension().map(|x| x == "log").unwrap_or(false))
.collect();
assert!(
log_files.is_empty(),
"expected no .log files for LogOutput::Off, found: {:?}",
log_files.iter().map(|e| e.path()).collect::<Vec<_>>()
);
}