use super::ctx::Ctx;
use crate::runtime::Output;
use crate::runtime::report::{Event, Human, Json, Level, Reporter};
use crate::runtime::session::AgentSession;
use anyhow::{Result, anyhow};
use regex::Regex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
pub trait ScriptHost {
fn run_top_level(&mut self) -> TopLevel;
fn run_scenario(&mut self, name: &str) -> std::result::Result<(), String>;
}
pub enum TopLevel {
Single(std::result::Result<(), String>),
Suite {
names: Vec<String>,
top_error: Option<String>,
},
}
enum Outcome {
Single(std::result::Result<(), String>),
Suite { total: usize, passed: usize },
}
pub fn run<H, F>(
programs: Vec<(String, F)>,
output: Output,
default_timeout: Duration,
only: Option<String>,
) -> Result<()>
where
H: ScriptHost + Send + 'static,
F: FnOnce(Arc<Ctx>) -> Result<H> + Send + 'static,
{
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let level = if output.quiet {
Level::Quiet
} else if output.verbose {
Level::Verbose
} else {
Level::Normal
};
let reporter: Box<dyn Reporter + Send> = if output.json {
Box::new(Json)
} else {
Box::new(Human::new(level))
};
let matcher = only.as_deref().map(build_matcher).transpose()?;
let ctx = Arc::new(Ctx::new(rt.handle().clone(), reporter, default_timeout));
if output.insecure_http {
ctx.set_http_insecure(true);
eprintln!(
"⚠ ringo-flow: TLS certificate verification is DISABLED for http(...) (--insecure-http)"
);
}
let (logs, save_audio) = (output.logs, output.save_audio);
let multi = programs.len() > 1;
let c = ctx.clone();
let join = rt.block_on(async move {
tokio::task::spawn_blocking(move || {
run_files(programs, &c, matcher.as_ref(), logs, save_audio, multi)
})
.await
});
let agg = match join {
Ok(agg) => agg,
Err(e) => {
teardown(&ctx, &rt);
let msg = format!("script task panicked: {e}");
ctx.emit(&Event::Finished {
passed: false,
error: Some(msg.clone()),
});
return Err(anyhow!(msg));
}
};
teardown(&ctx, &rt);
if let Some(pat) = &only
&& agg.scenarios == 0
{
return Err(anyhow!("no scenario matched --scenario pattern `{pat}`"));
}
if multi {
ctx.emit(&Event::RunFinished {
files: agg.files,
passed_files: agg.passed_files,
scenarios: agg.scenarios,
passed_scenarios: agg.passed_scenarios,
});
}
if agg.passed_files == agg.files {
Ok(())
} else {
Err(anyhow!(
"{}/{} files failed",
agg.files - agg.passed_files,
agg.files
))
}
}
#[derive(Default)]
struct Aggregate {
files: usize,
passed_files: usize,
scenarios: usize,
passed_scenarios: usize,
}
fn run_files<H, F>(
programs: Vec<(String, F)>,
ctx: &Arc<Ctx>,
matcher: Option<&ScenarioMatcher>,
logs: bool,
save_audio: bool,
multi: bool,
) -> Aggregate
where
H: ScriptHost,
F: FnOnce(Arc<Ctx>) -> Result<H>,
{
let mut agg = Aggregate::default();
for (label, build) in programs {
agg.files += 1;
if multi {
ctx.emit(&Event::FileStarted { path: &label });
}
let host = match build(ctx.clone()) {
Ok(h) => h,
Err(e) => {
ctx.emit(&Event::Finished {
passed: false,
error: Some(format!("{e}")),
});
continue;
}
};
let outcome = run_lifecycle(host, ctx, matcher, logs, save_audio);
ctx.reset_sessions();
let (total, passed) = match outcome {
Outcome::Single(r) => {
let ok = r.is_ok();
ctx.emit(&Event::Finished {
passed: ok,
error: r.err(),
});
(1, usize::from(ok))
}
Outcome::Suite { total, passed } => {
ctx.emit(&Event::SuiteFinished { total, passed });
(total, passed)
}
};
agg.scenarios += total;
agg.passed_scenarios += passed;
if passed == total {
agg.passed_files += 1;
}
}
agg
}
type ScenarioMatcher = Box<dyn Fn(&str) -> bool + Send>;
fn build_matcher(pattern: &str) -> Result<ScenarioMatcher> {
if let Some(src) = pattern.strip_prefix("re:") {
let re = Regex::new(src).map_err(|e| anyhow!("invalid --scenario regex `{src}`: {e}"))?;
Ok(Box::new(move |name| re.is_match(name)))
} else {
let needle = pattern.to_lowercase();
Ok(Box::new(move |name| name.to_lowercase().contains(&needle)))
}
}
fn run_lifecycle<H: ScriptHost>(
mut host: H,
ctx: &Arc<Ctx>,
only: Option<&ScenarioMatcher>,
logs: bool,
save_audio: bool,
) -> Outcome {
let names = match host.run_top_level() {
TopLevel::Single(r) => {
dump_artifacts(ctx, logs, save_audio); return Outcome::Single(r);
}
TopLevel::Suite {
top_error: Some(e), ..
} => return Outcome::Single(Err(e)),
TopLevel::Suite { names, .. } => names,
};
let (mut total, mut passed) = (0, 0);
for name in names {
if only.is_some_and(|m| !m(&name)) {
continue;
}
total += 1;
ctx.emit(&Event::ScenarioStarted { name: &name });
let result = host.run_scenario(&name);
dump_artifacts(ctx, logs, save_audio); ctx.reset_sessions(); match &result {
Ok(()) => {
passed += 1;
ctx.emit(&Event::ScenarioFinished {
name: &name,
passed: true,
error: None,
});
}
Err(msg) => ctx.emit(&Event::ScenarioFinished {
name: &name,
passed: false,
error: Some(msg.clone()),
}),
}
}
Outcome::Suite { total, passed }
}
fn dump_artifacts(ctx: &Arc<Ctx>, logs: bool, save_audio: bool) {
if !logs && !save_audio {
return;
}
let sessions = ctx.sessions.lock().unwrap_or_else(|e| e.into_inner());
if logs {
dump_logs(&sessions);
}
if save_audio {
save_recordings(&sessions);
}
}
fn teardown(ctx: &Arc<Ctx>, rt: &tokio::runtime::Runtime) {
let sessions: Vec<AgentSession> = ctx
.sessions
.lock()
.unwrap_or_else(|e| e.into_inner())
.drain()
.map(|(_, s)| s)
.collect();
for s in &sessions {
s.hangup_all();
}
rt.block_on(async { tokio::time::sleep(Duration::from_millis(200)).await });
drop(sessions);
}
fn dump_logs(sessions: &HashMap<String, AgentSession>) {
let mut names: Vec<&String> = sessions.keys().collect();
names.sort();
for name in names {
eprintln!("\n── baresip log: {name} ──");
match std::fs::read_to_string(sessions[name].log_path()) {
Ok(content) => eprint!("{content}"),
Err(e) => eprintln!(
"(could not read {}: {e})",
sessions[name].log_path().display()
),
}
}
}
fn save_recordings(sessions: &HashMap<String, AgentSession>) {
let run_ts = chrono::Local::now().format("%Y%m%d-%H%M%S");
let mut names: Vec<&String> = sessions.keys().collect();
names.sort();
for name in names {
let dir = sessions[name].recording_dir();
let Ok(entries) = std::fs::read_dir(dir) else {
continue;
};
let mut wavs: Vec<std::path::PathBuf> = entries
.filter_map(std::result::Result::ok)
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.is_some_and(|n| n.starts_with("dump-") && n.ends_with(".wav"))
})
.collect();
wavs.sort();
for src in wavs {
let dir_tag = if src.to_string_lossy().ends_with("-enc.wav") {
"sent"
} else {
"recv"
};
let dst =
std::path::PathBuf::from(format!("ringo-audio-{run_ts}-{name}-{dir_tag}.wav"));
match std::fs::copy(&src, &dst) {
Ok(_) => eprintln!("saved recording: {} ({name} {dir_tag})", dst.display()),
Err(e) => eprintln!("(could not save {}: {e})", dst.display()),
}
}
}
}
#[cfg(test)]
mod tests {
use super::build_matcher;
#[test]
fn substring_is_default_and_case_insensitive() {
let m = build_matcher("blind refer").unwrap();
assert!(m("transfer (blind refer): target accepts"));
assert!(!m("simple call"));
assert!(build_matcher("TRANSFER").unwrap()("transfer: rejects"));
assert!(build_matcher("(blind refer)").unwrap()("x (blind refer) y"));
}
#[test]
fn re_prefix_is_a_regex() {
let m = build_matcher("re:^transfer").unwrap();
assert!(m("transfer: accepts"));
assert!(!m("a transfer")); assert!(build_matcher("re:tra(").is_err()); }
}