wavekat_sip/caller.rs
1//! Outbound calls and the established-call handle.
2//!
3//! [`Caller::dial`] binds a local RTP socket, builds the SDP offer, places the
4//! INVITE through the engine (answering a digest challenge if the server
5//! demands one), and on a 2xx returns a [`Call`] — the negotiated remote media
6//! plus the bound RTP socket. Audio device I/O, codecs and recording stay with
7//! the consumer; the `rtp_socket` + `remote_media` + `local_rtp_addr` triple is
8//! the raw plumbing.
9
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use rsip::{Header, Uri};
14use tokio::net::UdpSocket;
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, info};
18
19use crate::account::SipAccount;
20use crate::dtmf_info::{build_info_body, classify, content_type_header, InfoOutcome};
21use crate::endpoint::SipEndpoint;
22use crate::inbound::InboundRequest;
23use crate::rtp::dtmf::DtmfDigit;
24use crate::sdp::{build_sdp, build_sdp_with, parse_sdp, MediaDirection, RemoteMedia};
25use crate::session_timer::{
26 negotiate_uac, supported_timer_header, SessionDialogOps, SessionExpires, SessionTimer,
27 DEFAULT_SESSION_EXPIRES_SECS,
28};
29use crate::stack::call::{CallConfig, CallOutcome};
30use crate::stack::dialog::{Dialog, DialogId};
31use crate::stack::transaction::gen_tag;
32
33type BoxError = Box<dyn std::error::Error + Send + Sync>;
34
35/// An established call: negotiated remote media plus the local RTP socket.
36///
37/// The same handle is produced by [`Caller::dial`] (outbound) and
38/// [`crate::IncomingCall::accept`] (inbound), so call control is uniform.
39pub struct Call {
40 endpoint: Arc<SipEndpoint>,
41 /// Shared so a background session-timer loop ([`Call::session_handle`]) can
42 /// send refresh re-INVITEs / BYE while the call owner drives audio. The
43 /// mutex serializes the dialog's CSeq across both.
44 dialog: Arc<Mutex<Dialog>>,
45 /// This dialog's identity, used to register for inbound in-dialog requests
46 /// and termination.
47 dialog_id: DialogId,
48 /// Fired when the peer ends the call (an in-dialog `BYE`); surfaced via
49 /// [`Call::terminated`]. Registered with the endpoint at construction.
50 terminated: CancellationToken,
51 peer: SocketAddr,
52 /// `true` once we have put the peer on hold via a `sendonly` re-INVITE.
53 held: bool,
54 /// SDP `o=` version; bumped on every re-offer (RFC 3264 §5).
55 sdp_version: u32,
56 /// The RFC 4028 session timer negotiated at call setup, if any.
57 session_timer: Option<SessionTimer>,
58 /// Where the remote endpoint expects RTP (from the negotiated SDP).
59 pub remote_media: RemoteMedia,
60 /// Local RTP socket; share via `Arc` to send and receive concurrently.
61 pub rtp_socket: Arc<UdpSocket>,
62 /// Local RTP address advertised in our SDP.
63 pub local_rtp_addr: SocketAddr,
64}
65
66impl Call {
67 pub(crate) fn new(
68 endpoint: Arc<SipEndpoint>,
69 dialog: Dialog,
70 peer: SocketAddr,
71 session_timer: Option<SessionTimer>,
72 remote_media: RemoteMedia,
73 rtp_socket: Arc<UdpSocket>,
74 local_rtp_addr: SocketAddr,
75 ) -> Self {
76 let dialog_id = dialog.id();
77 // Register for the peer-BYE termination signal up front, so a remote
78 // hangup is observable via `Call::terminated` whether or not the call
79 // ever opts into `inbound_requests`.
80 let terminated = endpoint.register_termination(dialog_id.clone());
81 Self {
82 endpoint,
83 dialog: Arc::new(Mutex::new(dialog)),
84 dialog_id,
85 terminated,
86 peer,
87 held: false,
88 // The initial offer/answer was o= version 0.
89 sdp_version: 0,
90 session_timer,
91 remote_media,
92 rtp_socket,
93 local_rtp_addr,
94 }
95 }
96
97 /// Put the peer on hold (`on = true`, `a=sendonly`) or resume the call
98 /// (`on = false`, `a=sendrecv`) by sending an in-dialog re-INVITE with a
99 /// fresh SDP re-offer (RFC 3264 §8.4).
100 ///
101 /// The local hold state only flips once the peer accepts the re-INVITE with
102 /// a 2xx; a non-2xx final surfaces the server's reason and leaves the call
103 /// unchanged. The `o=` version is bumped for each re-offer regardless, as
104 /// RFC 3264 requires.
105 pub async fn set_hold(&mut self, on: bool) -> Result<(), BoxError> {
106 let direction = if on {
107 MediaDirection::SendOnly
108 } else {
109 MediaDirection::SendRecv
110 };
111 self.sdp_version += 1;
112 let offer = build_sdp_with(
113 self.endpoint.local_ip(),
114 self.local_rtp_addr.port(),
115 direction,
116 self.sdp_version,
117 );
118 let headers = vec![Header::ContentType("application/sdp".into())];
119 let response = {
120 let mut dialog = self.dialog.lock().await;
121 self.endpoint
122 .ua()
123 .reinvite(self.peer, &mut dialog, headers, offer)
124 .await
125 };
126 match response {
127 Some(r) if (200..300).contains(&r.status_code.code()) => {
128 self.held = on;
129 info!(on, "hold state updated via re-INVITE");
130 Ok(())
131 }
132 Some(r) => Err(format!("re-INVITE rejected: {}", r.status_code).into()),
133 None => Err("re-INVITE timed out with no final response".into()),
134 }
135 }
136
137 /// `true` if the call is currently on hold (we sent a `sendonly` re-INVITE
138 /// the peer accepted).
139 pub fn is_held(&self) -> bool {
140 self.held
141 }
142
143 /// The RFC 4028 session timer negotiated when the call was set up, or
144 /// `None` if neither side asked for one. Drive it with
145 /// [`crate::session_timer_loop`] against [`Call::session_handle`].
146 pub fn session_timer(&self) -> Option<SessionTimer> {
147 self.session_timer
148 }
149
150 /// A cloneable handle that sends refresh re-INVITEs / BYE on this call's
151 /// dialog, for running [`crate::session_timer_loop`] in a background task
152 /// alongside the audio path. Shares the dialog with the `Call`, so their
153 /// in-dialog requests serialize correctly.
154 pub fn session_handle(&self) -> CallSession {
155 CallSession {
156 endpoint: self.endpoint.clone(),
157 dialog: self.dialog.clone(),
158 peer: self.peer,
159 }
160 }
161
162 /// Opt in to handle this call's inbound in-dialog requests — the peer's
163 /// re-`INVITE`s (e.g. an RFC 4028 session refresh, or a peer-initiated
164 /// hold) and `INFO`s (e.g. SIP-INFO DTMF) — instead of having the endpoint
165 /// auto-answer them `200 OK`.
166 ///
167 /// Returns a stream; each [`InboundRequest`] must be answered (with
168 /// [`InboundRequest::respond`] / [`InboundRequest::ok`]). While the returned
169 /// [`InboundRequests`] is alive, those requests route here; drop it to
170 /// revert to auto-answering. `BYE` / `OPTIONS` are always auto-answered.
171 /// Call this once per [`Call`].
172 pub fn inbound_requests(&self) -> InboundRequests {
173 let rx = self.endpoint.register_dialog(self.dialog_id.clone());
174 InboundRequests {
175 endpoint: self.endpoint.clone(),
176 dialog_id: self.dialog_id.clone(),
177 rx,
178 }
179 }
180
181 /// A token that fires when the peer ends the call by sending an in-dialog
182 /// `BYE`. The endpoint auto-answers the BYE `200 OK`; this is purely the
183 /// notification. Clone it and `await` [`CancellationToken::cancelled`] in a
184 /// task to drive call teardown (stop audio, finalize a recording). It does
185 /// **not** fire for a local [`Call::hangup`] — the caller already knows.
186 pub fn terminated(&self) -> CancellationToken {
187 self.terminated.clone()
188 }
189
190 /// Send one DTMF press via SIP `INFO` (`application/dtmf-relay`).
191 ///
192 /// Use this only when the remote did not negotiate RFC 4733 — i.e.
193 /// [`RemoteMedia::dtmf_payload_type`] is `None`. When telephone-event is
194 /// available, prefer [`crate::send_dtmf_burst`] over RTP. A
195 /// [`InfoOutcome::UnsupportedMedia`] result means the remote rejects this
196 /// transport too; stop sending further presses on this dialog.
197 pub async fn send_dtmf_info(&mut self, digit: DtmfDigit, duration_ms: u32) -> InfoOutcome {
198 let body = build_info_body(digit, duration_ms).into_bytes();
199 let response = {
200 let mut dialog = self.dialog.lock().await;
201 self.endpoint
202 .ua()
203 .info(self.peer, &mut dialog, vec![content_type_header()], body)
204 .await
205 };
206 classify(response)
207 }
208
209 /// Blind-transfer the call: ask the peer to place a fresh call to `target`
210 /// by sending an in-dialog `REFER` with a `Refer-To` (RFC 3515).
211 ///
212 /// Returns `Ok(())` once the peer accepts the `REFER` with a 2xx
213 /// (`202 Accepted`) — at which point the transfer is *in progress*, not yet
214 /// complete. The peer then reports the outcome as a series of in-dialog
215 /// `NOTIFY`s (a `message/sipfrag` status line) that arrive on
216 /// [`Call::inbound_requests`]; the consumer watches those (parsing each with
217 /// [`crate::parse_sipfrag_status`]) and tears its own leg down once the
218 /// target answers. A non-2xx final to the `REFER` surfaces the peer's reason
219 /// and leaves the call unchanged — the peer won't honour the transfer, so
220 /// the consumer should keep the call up.
221 ///
222 /// This is *blind* (unattended) transfer: we do not first call `target`
223 /// ourselves. Attended transfer (consult `target`, then `REFER` with
224 /// `Replaces`) is a separate method, not yet implemented.
225 pub async fn blind_transfer(&mut self, target: Uri) -> Result<(), BoxError> {
226 let headers = vec![crate::refer::refer_to_header(&target)];
227 let response = {
228 let mut dialog = self.dialog.lock().await;
229 self.endpoint
230 .ua()
231 .refer(self.peer, &mut dialog, headers)
232 .await
233 };
234 match response {
235 Some(r) if (200..300).contains(&r.status_code.code()) => {
236 info!(%target, "blind transfer accepted (REFER 2xx); awaiting NOTIFY");
237 Ok(())
238 }
239 Some(r) => Err(format!("REFER rejected: {}", r.status_code).into()),
240 None => Err("REFER timed out with no final response".into()),
241 }
242 }
243
244 /// Hang up by sending an in-dialog `BYE`. Returns once the peer 2xxs it
245 /// (or the transaction gives up).
246 pub async fn hangup(&mut self) -> Result<(), BoxError> {
247 let acked = {
248 let mut dialog = self.dialog.lock().await;
249 self.endpoint.ua().hangup(self.peer, &mut dialog).await
250 };
251 if acked {
252 info!("call hung up (BYE acknowledged)");
253 Ok(())
254 } else {
255 Err("BYE was not acknowledged".into())
256 }
257 }
258}
259
260impl Drop for Call {
261 fn drop(&mut self) {
262 // Release the termination registration so the endpoint's table doesn't
263 // grow for the life of the process. (`InboundRequests` similarly
264 // unregisters the dialog on its own drop.)
265 self.endpoint.unregister_termination(&self.dialog_id);
266 }
267}
268
269/// A stream of a [`Call`]'s inbound in-dialog requests (peer re-`INVITE` /
270/// `INFO`), produced by [`Call::inbound_requests`].
271///
272/// Dropping it unregisters the dialog, so its inbound requests revert to being
273/// auto-answered `200 OK` by the endpoint.
274pub struct InboundRequests {
275 endpoint: Arc<SipEndpoint>,
276 dialog_id: DialogId,
277 rx: mpsc::Receiver<InboundRequest>,
278}
279
280impl InboundRequests {
281 /// Await the next inbound request, or `None` once the call's endpoint shuts
282 /// down or this stream is being torn down.
283 pub async fn recv(&mut self) -> Option<InboundRequest> {
284 self.rx.recv().await
285 }
286}
287
288impl Drop for InboundRequests {
289 fn drop(&mut self) {
290 self.endpoint.unregister_dialog(&self.dialog_id);
291 }
292}
293
294/// A cloneable session-control handle over a [`Call`]'s dialog.
295///
296/// Produced by [`Call::session_handle`] and consumed by
297/// [`crate::session_timer_loop`]: it implements [`SessionDialogOps`] so the
298/// loop can send refresh re-INVITEs and the tear-down BYE on the shared dialog.
299#[derive(Clone)]
300pub struct CallSession {
301 endpoint: Arc<SipEndpoint>,
302 dialog: Arc<Mutex<Dialog>>,
303 peer: SocketAddr,
304}
305
306impl SessionDialogOps for CallSession {
307 async fn refresh(
308 &self,
309 mut headers: Vec<Header>,
310 body: Option<Vec<u8>>,
311 ) -> Result<Option<rsip::Response>, BoxError> {
312 let body = body.unwrap_or_default();
313 if !body.is_empty() {
314 headers.push(Header::ContentType("application/sdp".into()));
315 }
316 let mut dialog = self.dialog.lock().await;
317 Ok(self
318 .endpoint
319 .ua()
320 .reinvite(self.peer, &mut dialog, headers, body)
321 .await)
322 }
323
324 async fn send_bye(&self) -> Result<(), BoxError> {
325 let mut dialog = self.dialog.lock().await;
326 if self.endpoint.ua().hangup(self.peer, &mut dialog).await {
327 Ok(())
328 } else {
329 Err("BYE was not acknowledged".into())
330 }
331 }
332}
333
334/// Stateless helper bound to an account + endpoint.
335pub struct Caller {
336 account: SipAccount,
337 endpoint: Arc<SipEndpoint>,
338}
339
340impl Caller {
341 /// Construct a `Caller` for the given account and shared endpoint.
342 pub fn new(account: SipAccount, endpoint: Arc<SipEndpoint>) -> Self {
343 Self { account, endpoint }
344 }
345
346 /// Place an outbound call to `target` and wait for it to be answered.
347 ///
348 /// Binds a local RTP socket, offers G.711 SDP, sends the INVITE to the
349 /// account's resolved server, follows provisional responses, and answers a
350 /// single `401`/`407` challenge. Returns the [`Call`] on a 2xx, or an error
351 /// if the call was rejected, timed out, or had no usable SDP answer.
352 pub async fn dial(&self, target: Uri) -> Result<Call, BoxError> {
353 self.dial_inner(target, &CancellationToken::new(), None)
354 .await
355 }
356
357 /// Like [`Caller::dial`], but `cancel` aborts a still-ringing call with a
358 /// `CANCEL` (RFC 3261 §9). Firing the token once a provisional has arrived
359 /// tears the pending INVITE down; the returned error then reflects the
360 /// `487 Request Terminated`. Use `cancel.is_cancelled()` to tell a
361 /// cancellation apart from a callee rejection.
362 pub async fn dial_cancellable(
363 &self,
364 target: Uri,
365 cancel: &CancellationToken,
366 ) -> Result<Call, BoxError> {
367 self.dial_inner(target, cancel, None).await
368 }
369
370 /// Like [`Caller::dial_cancellable`], and additionally forwards each
371 /// provisional response status (e.g. [`rsip::StatusCode::Ringing`]) to
372 /// `progress` as it arrives — for a "ringing" UI. The channel closes when
373 /// the call reaches a final response.
374 pub async fn dial_with_progress(
375 &self,
376 target: Uri,
377 cancel: &CancellationToken,
378 progress: mpsc::Sender<rsip::StatusCode>,
379 ) -> Result<Call, BoxError> {
380 self.dial_inner(target, cancel, Some(progress)).await
381 }
382
383 async fn dial_inner(
384 &self,
385 target: Uri,
386 cancel: &CancellationToken,
387 progress: Option<mpsc::Sender<rsip::StatusCode>>,
388 ) -> Result<Call, BoxError> {
389 let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
390 let local_rtp_addr = rtp_socket.local_addr()?;
391 let local_ip = self.endpoint.local_ip();
392 info!(%local_ip, rtp_port = local_rtp_addr.port(), "bound RTP socket for outbound dial");
393
394 let offer = build_sdp(local_ip, local_rtp_addr.port());
395 debug!("SDP offer:\n{}", String::from_utf8_lossy(&offer));
396
397 let from: Uri =
398 format!("sip:{}@{}", self.account.username, self.account.domain).try_into()?;
399 let contact: Uri = format!(
400 "sip:{}@{}",
401 self.account.username,
402 self.endpoint.local_addr()
403 )
404 .try_into()?;
405
406 // Advertise RFC 4028 session-timer support so the answerer can pin a
407 // refresh interval in its 2xx (negotiated below).
408 let cfg = CallConfig {
409 target,
410 from,
411 contact,
412 from_tag: gen_tag(),
413 call_id: format!("{}@wavekat.com", gen_tag()),
414 sdp: offer,
415 extra_headers: vec![
416 supported_timer_header(),
417 SessionExpires {
418 interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
419 refresher: None,
420 }
421 .header(),
422 ],
423 username: self.account.auth_username().to_string(),
424 password: self.account.password.clone(),
425 };
426
427 match self
428 .endpoint
429 .ua()
430 .call_cancellable(&cfg, self.endpoint.server(), 1, cancel, progress.as_ref())
431 .await
432 {
433 CallOutcome::Answered { dialog, response } => {
434 let remote_media = parse_sdp(&response.body)?;
435 let session_timer = negotiate_uac(&response.headers);
436 info!(
437 remote_addr = %remote_media.addr,
438 remote_port = remote_media.port,
439 payload_type = remote_media.payload_type,
440 ?session_timer,
441 "call answered; parsed SDP answer",
442 );
443 Ok(Call::new(
444 self.endpoint.clone(),
445 *dialog,
446 self.endpoint.server(),
447 session_timer,
448 remote_media,
449 Arc::new(rtp_socket),
450 local_rtp_addr,
451 ))
452 }
453 CallOutcome::Rejected(status) => Err(format!("call rejected: {status}").into()),
454 CallOutcome::Unauthorized => Err("call rejected: authentication failed".into()),
455 CallOutcome::TimedOut => Err("call timed out with no final response".into()),
456 CallOutcome::EngineStopped => Err("engine stopped".into()),
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use crate::account::Transport;
465
466 fn test_account() -> SipAccount {
467 SipAccount {
468 display_name: "Office".to_string(),
469 username: "1001".to_string(),
470 password: "secret".to_string(),
471 domain: "sip.example.com".to_string(),
472 auth_username: None,
473 server: Some("pbx.example.com".to_string()),
474 port: Some(5080),
475 transport: Transport::Udp,
476 }
477 }
478
479 #[test]
480 fn caller_holds_account_and_endpoint_inputs() {
481 // Construction is pure; the call path is covered by the stack's
482 // loopback tests (`stack::ua`). Here we just check `new` wiring.
483 let acct = test_account();
484 assert_eq!(acct.auth_username(), "1001");
485 }
486}