use std::path::Path;
use std::time::Duration;
use anyhow::{Context, Result};
use tokio_util::sync::CancellationToken;
use crate::cli::{Cli, Command};
use crate::config::{parse_config, validate_config, Config};
use crate::oneshot::classify;
use crate::oneshot::dispatch::{dispatch_oneshot, find_camera_config, snapshot_json_preflight};
use crate::oneshot::errors::ConfigError;
use crate::oneshot::output::{format_failure, format_success, Mode, Outcome};
use crate::oneshot::{runner, snapshot};
pub fn cli_output_mode(cli: &Cli) -> Mode {
if cli.json {
Mode::Json
} else {
Mode::Human
}
}
pub fn load_validated_config(path: &Path) -> Result<Config> {
let config_str = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read config file: {}", path.display()))?;
let config = parse_config(&config_str)
.map_err(|e| anyhow::anyhow!(e))
.context("Invalid configuration")?;
validate_config(&config)
.map_err(|e| anyhow::anyhow!(e))
.context("Configuration validation failed")?;
Ok(config)
}
pub fn emit_success_bytes(mode: Mode, outcome: &Outcome) -> (String, String) {
format_success(mode, outcome)
}
pub fn emit_failure_payload(mode: Mode, err: &anyhow::Error, kind: &str) -> (Mode, String) {
(mode, format_failure(mode, err, kind))
}
pub async fn sleep_or_cancel(delay: Duration, cancel: &CancellationToken) -> bool {
tokio::select! {
_ = tokio::time::sleep(delay) => true,
_ = cancel.cancelled() => false,
}
}
pub fn on_ctrl_c_received(token: &CancellationToken) {
tracing::info!("Received Ctrl+C, aborting one-shot");
token.cancel();
}
pub fn spawn_ctrl_c_cancel(token: CancellationToken) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
on_ctrl_c_received(&token);
}
})
}
pub fn camera_names(config: &Config) -> Vec<String> {
config.cameras.iter().map(|c| c.name.clone()).collect()
}
pub fn emit_success_to<W1: std::io::Write, W2: std::io::Write>(
mode: Mode,
outcome: &Outcome,
stdout: &mut W1,
stderr: &mut W2,
) {
let (out, err) = format_success(mode, outcome);
if !out.is_empty() {
let _ = stdout.write_all(out.as_bytes());
}
if !err.is_empty() {
let _ = stderr.write_all(err.as_bytes());
}
}
pub fn emit_failure_to<W1: std::io::Write, W2: std::io::Write>(
mode: Mode,
err: &anyhow::Error,
kind: &str,
stdout: &mut W1,
stderr: &mut W2,
) {
let s = format_failure(mode, err, kind);
match mode {
Mode::Json => {
let _ = stdout.write_all(s.as_bytes());
}
Mode::Human => {
let _ = stderr.write_all(s.as_bytes());
}
}
}
pub fn emit_success(mode: Mode, outcome: &Outcome) {
emit_success_to(
mode,
outcome,
&mut std::io::stdout().lock(),
&mut std::io::stderr().lock(),
);
}
pub fn emit_failure(mode: Mode, err: &anyhow::Error, kind: &str) {
emit_failure_to(
mode,
err,
kind,
&mut std::io::stdout().lock(),
&mut std::io::stderr().lock(),
);
}
pub fn run_check_config_to<W1: std::io::Write, W2: std::io::Write>(
cli: &Cli,
mode: Mode,
stdout: &mut W1,
stderr: &mut W2,
) -> i32 {
let path = cli.config_path();
let config_str = match std::fs::read_to_string(path) {
Ok(s) => s,
Err(e) => {
let kind = if e.kind() == std::io::ErrorKind::NotFound {
classify::EXIT_USAGE
} else {
classify::EXIT_CONFIG
};
let msg = anyhow::anyhow!(format!("read {}: {}", path.display(), e));
emit_failure_to(
mode,
&msg,
crate::cli_convert::exit_code_to_kind(kind),
stdout,
stderr,
);
return kind;
}
};
let config = match crate::config::parse_config(&config_str) {
Ok(c) => c,
Err(e) => {
let msg = anyhow::anyhow!(format!("parse {}: {}", path.display(), e));
emit_failure_to(mode, &msg, "config", stdout, stderr);
return classify::EXIT_CONFIG;
}
};
if let Err(e) = crate::config::validate_config(&config) {
let msg = anyhow::anyhow!(format!("validate {}: {}", path.display(), e));
emit_failure_to(mode, &msg, "config", stdout, stderr);
return classify::EXIT_CONFIG;
}
crate::config::warn_deprecated_pause_fields(&config);
crate::config::warn_neolink_compat_fields(&config);
crate::config::warn_idle_timeout_below_prune_floor(&config);
let summary = format!(
"config OK: {} camera(s), bind {}:{}\n",
config.cameras.len(),
config.bind_addr,
config.bind_port,
);
match mode {
Mode::Json => {
let _ = writeln!(
stdout,
r#"{{"ok":true,"cameras":{}}}"#,
config.cameras.len()
);
}
Mode::Human => {
let _ = stdout.write_all(summary.as_bytes());
}
}
classify::EXIT_OK
}
pub async fn run_oneshot_to<W1: std::io::Write, W2: std::io::Write>(
cli: &Cli,
stdout: &mut W1,
stderr: &mut W2,
) -> i32 {
use futures::FutureExt;
let mode = cli_output_mode(cli);
if cli.is_check_config() {
return run_check_config_to(cli, mode, stdout, stderr);
}
if let Err(e) = snapshot_json_preflight(cli.json, &cli.command) {
emit_failure_to(mode, &e, "usage", stdout, stderr);
return classify::EXIT_USAGE;
}
let result: Result<Outcome> = async {
let config_path = cli.config_path();
let config_str = match std::fs::read_to_string(config_path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Err(crate::oneshot::errors::UsageError::new(format!(
"config file not found: {}",
config_path.display()
))
.into());
}
Err(e) => {
return Err(
ConfigError::new(format!("read {}: {}", config_path.display(), e)).into(),
);
}
};
let config =
parse_config(&config_str).map_err(|e| ConfigError::new(format!("parse: {}", e)))?;
validate_config(&config).map_err(|e| ConfigError::new(format!("validate: {}", e)))?;
let camera_name = cli.camera_name().expect("oneshot without camera name");
let cam_cfg = find_camera_config(&config, camera_name)?;
let cancel = CancellationToken::new();
spawn_ctrl_c_cancel(cancel.clone());
if let Command::Snapshot {
output: out,
use_stream,
use_stream_raw: true,
..
} = &cli.command
{
if *use_stream {
tracing::info!(
"--use-stream is a neolink-compat no-op on bairelay (battery cams all support `snap`); \
delegating to get_snapshot. Use --use-stream-raw if you want NAL bytes."
);
}
let out = out.clone();
let json = cli.json;
return runner::run(&cam_cfg, cancel, move |cam| {
async move { snapshot::run(cam, out.as_deref(), json, true).await }.boxed()
})
.await;
}
let cmd = crate::cli_convert::clone_command(&cli.command);
let json = cli.json;
runner::run(&cam_cfg, cancel, move |cam| {
async move {
dispatch_oneshot(
cam as &dyn bairelay_neolink_core::bc_protocol::CameraDriver,
&cmd,
json,
)
.await
}
.boxed()
})
.await
}
.await;
match result {
Ok(outcome) => {
emit_success_to(mode, &outcome, stdout, stderr);
classify::EXIT_OK
}
Err(e) => {
let code = classify::classify(&e);
let kind = crate::cli_convert::exit_code_to_kind(code);
emit_failure_to(mode, &e, kind, stdout, stderr);
code
}
}
}
pub async fn run_oneshot(cli: &Cli) -> i32 {
run_oneshot_to(
cli,
&mut std::io::stdout().lock(),
&mut std::io::stderr().lock(),
)
.await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::Cli;
use crate::config::test_helpers::minimal_camera_config;
fn cli_from(args: &[&str]) -> Cli {
use clap::Parser;
Cli::try_parse_from(args).expect("cli parse")
}
#[test]
fn cli_output_mode_json_flag_picks_json() {
let cli = cli_from(&["bairelay", "--json", "mqtt-rtsp", "-c", "x.toml"]);
assert!(matches!(cli_output_mode(&cli), Mode::Json));
}
#[test]
fn cli_output_mode_default_is_human() {
let cli = cli_from(&["bairelay", "mqtt-rtsp", "-c", "x.toml"]);
assert!(matches!(cli_output_mode(&cli), Mode::Human));
}
#[test]
fn load_validated_config_missing_file_errors() {
let p = std::path::PathBuf::from("/nonexistent/bairelay-cfg-xxyy.toml");
let err = load_validated_config(&p).expect_err("should fail");
assert!(format!("{:#}", err).contains("Failed to read config file"));
}
#[test]
fn load_validated_config_malformed_toml_errors() {
let f = tempfile::NamedTempFile::new().unwrap();
std::fs::write(f.path(), b"not { valid toml = [[[").unwrap();
let err = load_validated_config(f.path()).expect_err("should fail");
let s = format!("{:#}", err);
assert!(s.contains("Invalid configuration") || s.contains("validation"));
}
#[test]
fn load_validated_config_validation_fails() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 0
cameras = []
"#;
std::fs::write(f.path(), toml).unwrap();
let err = load_validated_config(f.path()).expect_err("validator should reject");
let s = format!("{:#}", err);
assert!(
s.contains("validation") || s.contains("bind_port"),
"unexpected error: {s}"
);
}
#[test]
fn load_validated_config_happy_path() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
stream_prune_grace_secs = 60
cameras = []
"#;
std::fs::write(f.path(), toml).unwrap();
let cfg = load_validated_config(f.path()).expect("minimal valid config");
assert_eq!(cfg.bind_port, 8554);
assert!(cfg.cameras.is_empty());
}
#[test]
fn camera_names_empty_when_no_cameras() {
let cfg = Config::default();
assert!(camera_names(&cfg).is_empty());
}
#[test]
fn camera_names_preserves_insertion_order() {
let cfg = Config {
cameras: vec![
minimal_camera_config("a"),
minimal_camera_config("b"),
minimal_camera_config("c"),
],
..Config::default()
};
assert_eq!(camera_names(&cfg), vec!["a", "b", "c"]);
}
#[test]
fn emit_success_bytes_returns_both_streams() {
let (stdout, _stderr) = emit_success_bytes(
Mode::Human,
&Outcome::Snapshot {
bytes: 4,
path: Some("/tmp/x.jpg".into()),
format: "jpeg".into(),
},
);
assert!(!stdout.is_empty() || !_stderr.is_empty());
}
#[test]
fn emit_failure_payload_packages_mode() {
let err = anyhow::anyhow!("boom");
let (mode, s) = emit_failure_payload(Mode::Human, &err, "protocol");
assert!(matches!(mode, Mode::Human));
assert!(!s.is_empty());
}
#[test]
fn emit_success_to_routes_snapshot_payload_to_stdout_and_stderr() {
let mut out = Vec::new();
let mut err = Vec::new();
emit_success_to(
Mode::Human,
&Outcome::Snapshot {
bytes: 4,
path: Some("/tmp/x.jpg".into()),
format: "jpeg".into(),
},
&mut out,
&mut err,
);
assert!(!out.is_empty() || !err.is_empty());
let (expected_out, expected_err) = format_success(
Mode::Human,
&Outcome::Snapshot {
bytes: 4,
path: Some("/tmp/x.jpg".into()),
format: "jpeg".into(),
},
);
assert_eq!(out, expected_out.as_bytes());
assert_eq!(err, expected_err.as_bytes());
}
#[test]
fn emit_failure_to_json_writes_to_stdout_only() {
let err_in = anyhow::anyhow!("boom");
let mut out = Vec::new();
let mut errbuf = Vec::new();
emit_failure_to(Mode::Json, &err_in, "protocol", &mut out, &mut errbuf);
assert!(errbuf.is_empty(), "Json mode must not touch stderr");
let s = String::from_utf8(out).expect("utf8");
assert!(s.contains("\"ok\": false"));
assert!(s.contains("\"kind\": \"protocol\""));
}
#[test]
fn emit_failure_to_human_writes_to_stderr_only() {
let err_in = anyhow::anyhow!("boom");
let mut out = Vec::new();
let mut errbuf = Vec::new();
emit_failure_to(Mode::Human, &err_in, "protocol", &mut out, &mut errbuf);
assert!(out.is_empty(), "Human mode must not touch stdout");
let s = String::from_utf8(errbuf).expect("utf8");
assert!(s.contains("boom"));
}
#[test]
fn on_ctrl_c_received_cancels_token() {
let tok = CancellationToken::new();
assert!(!tok.is_cancelled());
on_ctrl_c_received(&tok);
assert!(tok.is_cancelled());
}
#[tokio::test]
async fn sleep_or_cancel_returns_true_on_natural_completion() {
let tok = CancellationToken::new();
let res = sleep_or_cancel(Duration::from_millis(0), &tok).await;
assert!(res, "zero-duration sleep must return true (sleep won)");
}
#[tokio::test]
async fn sleep_or_cancel_returns_false_when_token_already_cancelled() {
let tok = CancellationToken::new();
tok.cancel();
let res = sleep_or_cancel(Duration::from_secs(60), &tok).await;
assert!(!res, "pre-cancelled token must short-circuit the sleep");
}
#[tokio::test(start_paused = true)]
async fn sleep_or_cancel_returns_false_on_concurrent_cancel() {
let tok = CancellationToken::new();
let tok_for_canceller = tok.clone();
let canceller = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
tok_for_canceller.cancel();
});
let res = sleep_or_cancel(Duration::from_secs(60), &tok).await;
assert!(!res, "cancel firing during sleep must return false");
let _ = canceller.await;
}
#[tokio::test]
async fn run_oneshot_missing_config_file_returns_config_exit() {
let cli = cli_from(&[
"bairelay",
"snapshot",
"cam0",
"--output",
"/tmp/x.jpg",
"-c",
"/nonexistent-config-xyzz.toml",
]);
let code = run_oneshot_to(&cli, &mut Vec::new(), &mut Vec::new()).await;
assert_ne!(code, classify::EXIT_OK);
}
#[tokio::test]
async fn run_oneshot_unknown_camera_returns_nonzero() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
cameras = []
"#;
std::fs::write(f.path(), toml).unwrap();
let p = f.path().display().to_string();
let cli = cli_from(&[
"bairelay",
"snapshot",
"cam-missing",
"--output",
"/tmp/x.jpg",
"-c",
&p,
]);
let code = run_oneshot_to(&cli, &mut Vec::new(), &mut Vec::new()).await;
assert_ne!(code, classify::EXIT_OK);
}
#[tokio::test]
async fn run_oneshot_dispatch_path_returns_nonzero_on_tcp_failure() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
[[cameras]]
name = "cam0"
address = "127.0.0.1:65500"
username = "u"
password = ""
discovery = "local"
"#;
std::fs::write(f.path(), toml).unwrap();
let p = f.path().display().to_string();
let cli = cli_from(&["bairelay", "battery", "cam0", "-c", &p]);
let code = run_oneshot_to(&cli, &mut Vec::new(), &mut Vec::new()).await;
assert_ne!(code, classify::EXIT_OK);
}
#[tokio::test]
async fn run_oneshot_use_stream_raw_path_returns_nonzero_on_tcp_failure() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
[[cameras]]
name = "cam0"
address = "127.0.0.1:65501"
username = "u"
password = ""
discovery = "local"
"#;
std::fs::write(f.path(), toml).unwrap();
let p = f.path().display().to_string();
let outpath = tempfile::NamedTempFile::new().unwrap();
let outpath_s = outpath.path().display().to_string();
let cli = cli_from(&[
"bairelay",
"snapshot",
"cam0",
"--use-stream-raw",
"--output",
&outpath_s,
"-c",
&p,
]);
let code = run_oneshot_to(&cli, &mut Vec::new(), &mut Vec::new()).await;
assert_ne!(code, classify::EXIT_OK);
}
fn check_config_capture(cli: &Cli) -> (i32, Vec<u8>, Vec<u8>) {
let mode = cli_output_mode(cli);
let mut out = Vec::new();
let mut err = Vec::new();
let code = run_check_config_to(cli, mode, &mut out, &mut err);
(code, out, err)
}
#[test]
fn check_config_happy_path_returns_zero_and_writes_summary() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
[[cameras]]
name = "front"
address = "10.0.0.10:9000"
username = "admin"
password = "x"
"#;
std::fs::write(f.path(), toml).unwrap();
let cli = cli_from(&[
"bairelay",
"check-config",
"-c",
&f.path().display().to_string(),
]);
let (code, out, err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_OK);
assert!(err.is_empty(), "Human OK path must not write to stderr");
let s = String::from_utf8(out).expect("utf8");
assert!(s.starts_with("config OK: 1 camera(s)"));
}
#[test]
fn check_config_missing_file_returns_usage() {
let cli = cli_from(&[
"bairelay",
"check-config",
"-c",
"/nonexistent-bairelay-check-xxxx.toml",
]);
let (code, out, err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_USAGE);
assert!(out.is_empty(), "Human err path must not write to stdout");
let s = String::from_utf8(err).expect("utf8");
assert!(s.contains("read"));
}
#[test]
fn check_config_malformed_toml_returns_config() {
let f = tempfile::NamedTempFile::new().unwrap();
std::fs::write(f.path(), b"not { valid toml = [[[").unwrap();
let cli = cli_from(&[
"bairelay",
"check-config",
"-c",
&f.path().display().to_string(),
]);
let (code, out, err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_CONFIG);
assert!(out.is_empty());
let s = String::from_utf8(err).expect("utf8");
assert!(s.contains("parse"));
}
#[test]
fn check_config_validation_failure_returns_config() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 0
cameras = []
"#;
std::fs::write(f.path(), toml).unwrap();
let cli = cli_from(&[
"bairelay",
"check-config",
"-c",
&f.path().display().to_string(),
]);
let (code, _out, err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_CONFIG);
let s = String::from_utf8(err).expect("utf8");
assert!(s.contains("validate"));
}
#[test]
fn check_config_runs_neolink_compat_warnings_without_failing() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
tokio_console = true
[[cameras]]
name = "front"
address = "10.0.0.10:9000"
username = "admin"
password = "x"
idle_disconnect_timeout_secs = 5.0
[cameras.pause]
timeout = 9.0
on_motion = true
"#;
std::fs::write(f.path(), toml).unwrap();
let cli = cli_from(&[
"bairelay",
"check-config",
"-c",
&f.path().display().to_string(),
]);
let (code, _out, _err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_OK);
}
#[test]
fn check_config_directory_path_classifies_as_config_not_usage() {
let dir = tempfile::tempdir().unwrap();
let cli = cli_from(&[
"bairelay",
"check-config",
"-c",
&dir.path().display().to_string(),
]);
let (code, _out, err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_CONFIG);
let s = String::from_utf8(err).expect("utf8");
assert!(s.contains("read"));
}
#[test]
fn emit_success_real_stdio_does_not_panic() {
emit_success(Mode::Json, &Outcome::Siren);
emit_success(Mode::Human, &Outcome::Siren);
}
#[test]
fn emit_failure_real_stdio_does_not_panic() {
let err = anyhow::anyhow!("synthetic failure for coverage");
emit_failure(Mode::Json, &err, "test");
emit_failure(Mode::Human, &err, "test");
}
#[test]
fn check_config_json_mode_writes_ok_payload() {
let f = tempfile::NamedTempFile::new().unwrap();
let toml = r#"
bind = "127.0.0.1"
bind_port = 8554
cameras = []
"#;
std::fs::write(f.path(), toml).unwrap();
let cli = cli_from(&[
"bairelay",
"--json",
"check-config",
"-c",
&f.path().display().to_string(),
]);
let (code, out, err) = check_config_capture(&cli);
assert_eq!(code, classify::EXIT_OK);
assert!(err.is_empty(), "Json OK path must not write to stderr");
let s = String::from_utf8(out).expect("utf8");
assert!(s.contains("\"ok\":true"));
assert!(s.contains("\"cameras\":0"));
}
#[tokio::test]
async fn spawn_ctrl_c_cancel_returns_handle() {
let tok = CancellationToken::new();
let handle = spawn_ctrl_c_cancel(tok.clone());
handle.abort();
let _ = handle.await;
}
}