use crate::runtime::agent_options;
use crate::runtime::report::{Event, Reporter};
use crate::runtime::session::AgentSession;
use crate::runtime::state::{CallPhase, received_header_value};
use ringo_core::baresip::Account;
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use tokio::runtime::Handle;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CallState {
Idle,
Ringing,
Established,
}
impl std::fmt::Display for CallState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
CallState::Idle => "Idle",
CallState::Ringing => "Ringing",
CallState::Established => "Established",
})
}
}
pub struct AgentInfo {
pub name: String,
pub aor: String,
pub registered: bool,
pub state: CallState,
pub reason: Option<String>,
pub status_code: Option<u16>,
pub peer: Option<(String, Option<String>)>,
pub calls: usize,
}
type StashedAssertion = (Option<String>, String, String, bool);
thread_local! {
static PENDING_LABEL: std::cell::RefCell<Option<String>> =
const { std::cell::RefCell::new(None) };
static ASSERT_SILENT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
static LAST_ASSERT: std::cell::RefCell<Option<StashedAssertion>> =
const { std::cell::RefCell::new(None) };
}
pub fn mark_pending_label(label: impl Into<String>) {
PENDING_LABEL.with(|p| *p.borrow_mut() = Some(label.into()));
}
pub fn take_pending_label() -> Option<String> {
PENDING_LABEL.with(|p| p.borrow_mut().take())
}
pub struct Ctx {
pub rt: Handle,
pub reporter: Mutex<Box<dyn Reporter + Send>>,
pub sessions: Mutex<HashMap<String, AgentSession>>,
default_timeout_ms: AtomicU64,
http_insecure: AtomicBool,
}
impl Ctx {
pub fn new(rt: Handle, reporter: Box<dyn Reporter + Send>, default_timeout: Duration) -> Self {
Self {
rt,
reporter: Mutex::new(reporter),
sessions: Mutex::new(HashMap::new()),
default_timeout_ms: AtomicU64::new(default_timeout.as_millis() as u64),
http_insecure: AtomicBool::new(false),
}
}
pub fn set_http_insecure(&self, on: bool) {
self.http_insecure.store(on, Ordering::Relaxed);
}
pub fn http_insecure(&self) -> bool {
self.http_insecure.load(Ordering::Relaxed)
}
pub fn emit(&self, event: &Event) {
self.reporter
.lock()
.unwrap_or_else(|e| e.into_inner())
.emit(event);
}
fn with_session<R>(&self, name: &str, f: impl FnOnce(&AgentSession) -> R) -> Result<R, String> {
let map = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
let session = map.get(name).ok_or_else(|| {
format!("agent `{name}` is not connected — create it with `agent(\"{name}\", …)` first")
})?;
Ok(f(session))
}
fn act(&self, name: &str, kind: &'static str, detail: Option<&str>) {
self.emit(&Event::Action {
agent: name,
kind,
detail,
});
}
pub fn connect_agent(
&self,
name: &str,
account: Account,
headers: &[(String, String)],
) -> Result<(), String> {
let aor = format!("sip:{}@{}", account.username, account.domain);
let session = self
.rt
.block_on(AgentSession::connect(name, account, &agent_options()))
.map_err(|e| format!("agent `{name}`: connect failed: {e}"))?;
self.emit(&Event::AgentStarted { name, aor: &aor });
for (key, value) in headers {
session.add_header(key, value);
self.emit(&Event::Action {
agent: name,
kind: "header",
detail: Some(&format!("{key}: {value}")),
});
}
self.sessions
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(name.to_string(), session);
Ok(())
}
pub fn registered(&self, name: &str) -> Result<bool, String> {
mark_pending_label(format!("{name} registered"));
self.with_session(name, |s| s.state().borrow().registered)
}
pub fn call_state(&self, name: &str) -> Result<CallState, String> {
mark_pending_label(format!("{name} state"));
self.with_session(name, |s| {
let rx = s.state();
let st = rx.borrow();
if st.calls.iter().any(|c| c.phase == CallPhase::Established) {
CallState::Established
} else if st.calls.iter().any(|c| c.phase == CallPhase::Ringing) {
CallState::Ringing
} else {
CallState::Idle
}
})
}
pub fn reason(&self, name: &str) -> Result<Option<String>, String> {
mark_pending_label(format!("{name} reason"));
self.with_session(name, |s| s.state().borrow().last_call_reason.clone())
}
pub fn status_code(&self, name: &str) -> Result<Option<u16>, String> {
mark_pending_label(format!("{name} status code"));
self.with_session(name, |s| {
s.state()
.borrow()
.last_call_reason
.as_deref()
.and_then(sip_status_code)
})
}
pub fn header(&self, name: &str, header: &str) -> Result<Option<String>, String> {
mark_pending_label(format!("{name} header {header}"));
self.with_session(name, |s| received_header_value(&s.state().borrow(), header))
}
pub fn info(&self, name: &str) -> Result<AgentInfo, String> {
self.with_session(name, |s| {
let rx = s.state();
let st = rx.borrow();
let state = if st.calls.iter().any(|c| c.phase == CallPhase::Established) {
CallState::Established
} else if st.calls.iter().any(|c| c.phase == CallPhase::Ringing) {
CallState::Ringing
} else {
CallState::Idle
};
AgentInfo {
name: name.to_string(),
aor: s.aor.clone(),
registered: st.registered,
state,
reason: st.last_call_reason.clone(),
status_code: st.last_call_reason.as_deref().and_then(sip_status_code),
peer: st.peer(),
calls: st.calls.len(),
}
})
}
pub fn peer(&self, name: &str) -> Result<Option<(String, Option<String>)>, String> {
self.with_session(name, |s| s.state().borrow().peer())
}
pub fn headers(&self, name: &str) -> Result<Vec<(String, String)>, String> {
self.with_session(name, |s| s.state().borrow().received_headers_flat())
}
pub fn register(&self, name: &str) -> Result<(), String> {
self.with_session(name, AgentSession::register)?;
self.act(name, "register", None);
Ok(())
}
pub fn accept(&self, name: &str) -> Result<(), String> {
self.with_session(name, AgentSession::accept)?;
self.act(name, "accept", None);
Ok(())
}
pub fn hangup(&self, name: &str) -> Result<(), String> {
self.with_session(name, AgentSession::hangup)?;
self.act(name, "hangup", None);
Ok(())
}
pub fn hold(&self, name: &str) -> Result<(), String> {
self.with_session(name, AgentSession::hold)?;
self.act(name, "hold", None);
Ok(())
}
pub fn resume(&self, name: &str) -> Result<(), String> {
self.with_session(name, AgentSession::resume)?;
self.act(name, "resume", None);
Ok(())
}
pub fn mute(&self, name: &str) -> Result<(), String> {
self.with_session(name, AgentSession::mute)?;
self.act(name, "mute", None);
Ok(())
}
pub fn dtmf(&self, name: &str, digits: &str, gap: Duration) -> Result<(), String> {
let digits: Vec<char> = digits.chars().filter(|c| !c.is_whitespace()).collect();
for (i, c) in digits.iter().enumerate() {
if i > 0 && !gap.is_zero() {
std::thread::sleep(gap);
}
self.with_session(name, |s| s.send_dtmf(*c))?;
}
let detail: String = digits.iter().collect();
self.act(name, "dtmf", Some(&detail));
Ok(())
}
fn do_dial(&self, name: &str, uri: &str) -> Result<(), String> {
self.with_session(name, |s| s.dial(uri))?;
self.act(name, "dial", Some(uri));
Ok(())
}
pub fn dial_agent(&self, name: &str, target: &str) -> Result<(), String> {
let uri = self.with_session(target, |s| s.aor.clone())?;
self.do_dial(name, &uri)
}
pub fn dial_uri(&self, name: &str, target: &str) -> Result<(), String> {
let uri = self.resolve_uri(name, target)?;
self.do_dial(name, &uri)
}
fn resolve_uri(&self, name: &str, target: &str) -> Result<String, String> {
if target.starts_with("sip:") || target.contains('@') {
Ok(target.to_string())
} else {
let domain = self.with_session(name, |s| s.domain().to_string())?;
Ok(format!("sip:{target}@{domain}"))
}
}
pub fn transfer_agent(&self, name: &str, target: &str) -> Result<(), String> {
let uri = self.with_session(target, |s| s.aor.clone())?;
self.with_session(name, |s| s.transfer(&uri))?;
self.act(name, "transfer", Some(&uri));
Ok(())
}
pub fn transfer_uri(&self, name: &str, target: &str) -> Result<(), String> {
let uri = self.resolve_uri(name, target)?;
self.with_session(name, |s| s.transfer(&uri))?;
self.act(name, "transfer", Some(&uri));
Ok(())
}
pub fn attended_transfer_agent(&self, name: &str, target: &str) -> Result<(), String> {
let uri = self.with_session(target, |s| s.aor.clone())?;
self.with_session(name, |s| s.attended_transfer_start(&uri))?;
self.act(name, "attended-transfer", Some(&uri));
Ok(())
}
pub fn attended_transfer_uri(&self, name: &str, target: &str) -> Result<(), String> {
let uri = self.resolve_uri(name, target)?;
self.with_session(name, |s| s.attended_transfer_start(&uri))?;
self.act(name, "attended-transfer", Some(&uri));
Ok(())
}
pub fn complete_transfer(&self, name: &str) -> Result<(), String> {
self.with_session(name, |s| s.attended_transfer_exec())?;
self.act(name, "complete-transfer", None);
Ok(())
}
pub fn abort_transfer(&self, name: &str) -> Result<(), String> {
self.with_session(name, |s| s.attended_transfer_abort())?;
self.act(name, "abort-transfer", None);
Ok(())
}
pub fn set_audio_source(&self, name: &str, spec: &str) -> Result<(), String> {
self.with_session(name, |s| s.set_audio_source(spec))
}
pub fn recording_dir(&self, name: &str) -> Result<std::path::PathBuf, String> {
self.with_session(name, |s| s.recording_dir().to_path_buf())
}
pub fn emit_action(&self, name: &str, kind: &'static str, detail: Option<&str>) {
self.act(name, kind, detail);
}
pub fn report_assertion(
&self,
desc: Option<String>,
expect: String,
actual: String,
ok: bool,
) -> bool {
if ASSERT_SILENT.with(|s| s.get()) {
LAST_ASSERT.with(|l| *l.borrow_mut() = Some((desc, expect, actual, ok)));
} else {
self.emit(&Event::Assertion {
label: desc.as_deref(),
expect,
ok,
actual: Some(actual),
});
}
take_pending_label();
ok
}
pub fn set_assert_silent(&self, silent: bool) {
ASSERT_SILENT.with(|s| s.set(silent));
}
pub fn emit_last_assert(&self) {
if let Some((desc, expect, actual, ok)) = LAST_ASSERT.with(|l| l.borrow_mut().take()) {
self.emit(&Event::Assertion {
label: desc.as_deref(),
expect,
ok,
actual: Some(actual),
});
}
}
pub fn reset_sessions(&self) {
let sessions: Vec<AgentSession> = self
.sessions
.lock()
.unwrap_or_else(|e| e.into_inner())
.drain()
.map(|(_, s)| s)
.collect();
for s in &sessions {
s.hangup_all();
}
self.rt
.block_on(async { tokio::time::sleep(Duration::from_millis(200)).await });
drop(sessions);
}
pub fn default_timeout(&self) -> Duration {
Duration::from_millis(self.default_timeout_ms.load(Ordering::Relaxed))
}
pub fn set_default_timeout(&self, d: Duration) {
self.default_timeout_ms
.store(d.as_millis() as u64, Ordering::Relaxed);
}
}
pub fn sip_status_code(reason: &str) -> Option<u16> {
let code = reason.split_whitespace().next()?.parse::<u16>().ok()?;
(100..=699).contains(&code).then_some(code)
}
pub fn sip_user_part(uri: &str) -> String {
let after_scheme = uri.split_once(':').map_or(uri, |(_, rest)| rest);
let user = after_scheme.split('@').next().unwrap_or(after_scheme);
user.split(';').next().unwrap_or(user).to_string()
}
#[cfg(test)]
mod tests {
use super::{mark_pending_label, sip_status_code, take_pending_label};
#[test]
fn sip_status_code_parsed_from_reason() {
assert_eq!(sip_status_code("603 Decline"), Some(603));
assert_eq!(sip_status_code("486 Busy Here"), Some(486));
assert_eq!(sip_status_code("200 OK"), Some(200));
assert_eq!(sip_status_code("Connection reset by peer"), None);
assert_eq!(sip_status_code(""), None);
assert_eq!(sip_status_code("999 Bogus"), None); }
#[test]
fn pending_label_is_take_once() {
assert_eq!(take_pending_label(), None); mark_pending_label("Caller state");
assert_eq!(take_pending_label().as_deref(), Some("Caller state"));
assert_eq!(take_pending_label(), None); mark_pending_label("Caller state");
mark_pending_label("Callee registered");
assert_eq!(take_pending_label().as_deref(), Some("Callee registered"));
}
}