use super::state::{AgentState, reduce};
use anyhow::{Context, Result};
use ringo_core::baresip::{Account, BaresipOptions, Instance};
use ringo_core::client;
use ringo_core::event::AppEvent;
use ringo_core::phone::{BaresipPhone, Phone};
use ringo_core::siptrace;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, watch};
const TRACE_POLL_INTERVAL: Duration = Duration::from_millis(150);
pub struct AgentSession {
pub aor: String,
pub regint: u32,
phone: BaresipPhone,
state_rx: watch::Receiver<AgentState>,
_instance: Instance,
}
impl AgentSession {
pub async fn connect(name: &str, account: Account, options: &BaresipOptions) -> Result<Self> {
let instance = Instance::spawn(name, &account, options)
.with_context(|| format!("spawn baresip for `{name}`"))?;
let stream = connect_retry(instance.port, Duration::from_secs(10))
.await
.with_context(|| format!("connect to baresip ctrl_tcp for `{name}`"))?;
let (mut reader, mut writer) = stream.into_split();
let (cmd_tx, mut cmd_rx) = mpsc::channel::<(String, String)>(32);
let (state_tx, state_rx) = watch::channel(AgentState::default());
let state_tx = Arc::new(state_tx);
let reader_tx = Arc::clone(&state_tx);
tokio::spawn(async move {
while let Ok(msg) = client::read_message(&mut reader).await {
let event = AppEvent::from(msg);
reader_tx.send_modify(|s| reduce(s, &event));
}
});
let log_path = instance.log_path.clone();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(TRACE_POLL_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut tail = siptrace::TraceTail::new();
loop {
ticker.tick().await;
if state_tx.is_closed() {
break; }
let lp = log_path.clone();
let (returned, invites) =
match tokio::task::spawn_blocking(move || (tail.poll(&lp), tail)).await {
Ok((invites, t)) => (t, invites),
Err(_) => break, };
tail = returned;
if let Some(invites) = invites {
merge_received_headers(&state_tx, invites);
}
}
});
tokio::spawn(async move {
while let Some((cmd, params)) = cmd_rx.recv().await {
if client::write_command(&mut writer, &cmd, ¶ms)
.await
.is_err()
{
break;
}
}
});
Ok(Self {
aor: format!("sip:{}@{}", account.username, account.domain),
regint: account.regint.unwrap_or(3600),
phone: BaresipPhone::new(cmd_tx),
state_rx,
_instance: instance,
})
}
pub fn domain(&self) -> &str {
self.aor.rsplit('@').next().unwrap_or("")
}
pub fn state(&self) -> watch::Receiver<AgentState> {
self.state_rx.clone()
}
pub fn log_path(&self) -> &Path {
&self._instance.log_path
}
pub fn recording_dir(&self) -> &Path {
self._instance.log_path.parent().unwrap_or(Path::new("."))
}
pub fn set_audio_source(&self, spec: &str) {
self.phone.set_audio_source(spec);
}
pub fn register(&self) {
self.phone.register(&self.aor, self.regint);
}
pub fn dial(&self, target: &str) {
self.phone.dial(target);
}
pub fn accept(&self) {
self.phone.accept();
}
pub fn hold(&self) {
self.phone.hold();
}
pub fn resume(&self) {
self.phone.resume();
}
pub fn mute(&self) {
self.phone.mute();
}
pub fn send_dtmf(&self, digit: char) {
self.phone.send_dtmf(digit);
}
pub fn add_header(&self, key: &str, value: &str) {
self.phone.add_header(key, value);
}
pub fn hangup(&self) {
self.phone.hangup();
}
pub fn hangup_all(&self) {
self.phone.hangup_all();
}
pub fn transfer(&self, uri: &str) {
self.phone.transfer(uri);
}
pub fn attended_transfer_start(&self, uri: &str) {
self.phone.attended_transfer_start(uri);
}
pub fn attended_transfer_exec(&self) {
self.phone.attended_transfer_exec();
}
pub fn attended_transfer_abort(&self) {
self.phone.attended_transfer_abort();
}
}
fn merge_received_headers(state_tx: &watch::Sender<AgentState>, invites: siptrace::InviteHeaders) {
if invites.is_empty() {
return;
}
let has_new = {
let cur = state_tx.borrow();
invites
.keys()
.any(|k| !cur.received_headers.contains_key(k))
};
if has_new {
state_tx.send_modify(|s| {
for (call_id, headers) in invites {
s.received_headers.entry(call_id).or_insert(headers);
}
});
}
}
async fn connect_retry(port: u16, within: Duration) -> Result<TcpStream> {
let deadline = tokio::time::Instant::now() + within;
loop {
match TcpStream::connect(("127.0.0.1", port)).await {
Ok(s) => return Ok(s),
Err(e) if tokio::time::Instant::now() >= deadline => {
return Err(e).context("ctrl_tcp connect timed out");
}
Err(_) => tokio::time::sleep(Duration::from_millis(100)).await,
}
}
}