wavekat_sip/caller.rs
1//! Outbound calls and the established-call handle.
2//!
3//! [`Caller::dial`] binds a local RTP socket, builds the SDP offer, places the
4//! INVITE through the engine (answering a digest challenge if the server
5//! demands one), and on a 2xx returns a [`Call`] — the negotiated remote media
6//! plus the bound RTP socket. Audio device I/O, codecs and recording stay with
7//! the consumer; the `rtp_socket` + `remote_media` + `local_rtp_addr` triple is
8//! the raw plumbing.
9
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use rsip::{Header, Uri};
14use tokio::net::UdpSocket;
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, info};
18
19use crate::account::SipAccount;
20use crate::dtmf_info::{build_info_body, classify, content_type_header, InfoOutcome};
21use crate::endpoint::SipEndpoint;
22use crate::inbound::InboundRequest;
23use crate::rtp::dtmf::DtmfDigit;
24use crate::sdp::{build_sdp, build_sdp_with, parse_sdp, MediaDirection, RemoteMedia};
25use crate::session_timer::{
26 negotiate_uac, supported_timer_header, SessionDialogOps, SessionExpires, SessionTimer,
27 DEFAULT_SESSION_EXPIRES_SECS,
28};
29use crate::stack::call::{CallConfig, CallOutcome};
30use crate::stack::dialog::{Dialog, DialogId};
31use crate::stack::transaction::gen_tag;
32
33type BoxError = Box<dyn std::error::Error + Send + Sync>;
34
35/// An established call: negotiated remote media plus the local RTP socket.
36///
37/// The same handle is produced by [`Caller::dial`] (outbound) and
38/// [`crate::IncomingCall::accept`] (inbound), so call control is uniform.
39pub struct Call {
40 endpoint: Arc<SipEndpoint>,
41 /// Shared so a background session-timer loop ([`Call::session_handle`]) can
42 /// send refresh re-INVITEs / BYE while the call owner drives audio. The
43 /// mutex serializes the dialog's CSeq across both.
44 dialog: Arc<Mutex<Dialog>>,
45 /// This dialog's identity, used to register for inbound in-dialog requests
46 /// and termination.
47 dialog_id: DialogId,
48 /// The peer's remote target (its `Contact`), captured at establishment. The
49 /// URI a third party should `INVITE` to reach this exact peer — used as the
50 /// `Refer-To` target of an attended transfer (RFC 5589 §7), paired with the
51 /// dialog's `Replaces`.
52 peer_uri: Uri,
53 /// Fired when the peer ends the call (an in-dialog `BYE`); surfaced via
54 /// [`Call::terminated`]. Registered with the endpoint at construction.
55 terminated: CancellationToken,
56 peer: SocketAddr,
57 /// `true` once we have put the peer on hold via a `sendonly` re-INVITE.
58 held: bool,
59 /// SDP `o=` version; bumped on every re-offer (RFC 3264 §5).
60 sdp_version: u32,
61 /// The RFC 4028 session timer negotiated at call setup, if any.
62 session_timer: Option<SessionTimer>,
63 /// Where the remote endpoint expects RTP (from the negotiated SDP).
64 pub remote_media: RemoteMedia,
65 /// Local RTP socket; share via `Arc` to send and receive concurrently.
66 pub rtp_socket: Arc<UdpSocket>,
67 /// Local RTP address advertised in our SDP.
68 pub local_rtp_addr: SocketAddr,
69}
70
71impl Call {
72 pub(crate) fn new(
73 endpoint: Arc<SipEndpoint>,
74 dialog: Dialog,
75 peer: SocketAddr,
76 session_timer: Option<SessionTimer>,
77 remote_media: RemoteMedia,
78 rtp_socket: Arc<UdpSocket>,
79 local_rtp_addr: SocketAddr,
80 ) -> Self {
81 let dialog_id = dialog.id();
82 let peer_uri = dialog.remote_target().clone();
83 // Register for the peer-BYE termination signal up front, so a remote
84 // hangup is observable via `Call::terminated` whether or not the call
85 // ever opts into `inbound_requests`.
86 let terminated = endpoint.register_termination(dialog_id.clone());
87 Self {
88 endpoint,
89 dialog: Arc::new(Mutex::new(dialog)),
90 dialog_id,
91 peer_uri,
92 terminated,
93 peer,
94 held: false,
95 // The initial offer/answer was o= version 0.
96 sdp_version: 0,
97 session_timer,
98 remote_media,
99 rtp_socket,
100 local_rtp_addr,
101 }
102 }
103
104 /// Put the peer on hold (`on = true`, `a=sendonly`) or resume the call
105 /// (`on = false`, `a=sendrecv`) by sending an in-dialog re-INVITE with a
106 /// fresh SDP re-offer (RFC 3264 §8.4).
107 ///
108 /// The local hold state only flips once the peer accepts the re-INVITE with
109 /// a 2xx; a non-2xx final surfaces the server's reason and leaves the call
110 /// unchanged. The `o=` version is bumped for each re-offer regardless, as
111 /// RFC 3264 requires.
112 pub async fn set_hold(&mut self, on: bool) -> Result<(), BoxError> {
113 let direction = if on {
114 MediaDirection::SendOnly
115 } else {
116 MediaDirection::SendRecv
117 };
118 self.sdp_version += 1;
119 let offer = build_sdp_with(
120 self.endpoint.local_ip(),
121 self.local_rtp_addr.port(),
122 direction,
123 self.sdp_version,
124 );
125 let headers = vec![Header::ContentType("application/sdp".into())];
126 let response = {
127 let mut dialog = self.dialog.lock().await;
128 self.endpoint
129 .ua()
130 .reinvite(self.peer, &mut dialog, headers, offer)
131 .await
132 };
133 match response {
134 Some(r) if (200..300).contains(&r.status_code.code()) => {
135 self.held = on;
136 info!(on, "hold state updated via re-INVITE");
137 Ok(())
138 }
139 Some(r) => Err(format!("re-INVITE rejected: {}", r.status_code).into()),
140 None => Err("re-INVITE timed out with no final response".into()),
141 }
142 }
143
144 /// `true` if the call is currently on hold (we sent a `sendonly` re-INVITE
145 /// the peer accepted).
146 pub fn is_held(&self) -> bool {
147 self.held
148 }
149
150 /// The RFC 4028 session timer negotiated when the call was set up, or
151 /// `None` if neither side asked for one. Drive it with
152 /// [`crate::session_timer_loop`] against [`Call::session_handle`].
153 pub fn session_timer(&self) -> Option<SessionTimer> {
154 self.session_timer
155 }
156
157 /// A cloneable handle that sends refresh re-INVITEs / BYE on this call's
158 /// dialog, for running [`crate::session_timer_loop`] in a background task
159 /// alongside the audio path. Shares the dialog with the `Call`, so their
160 /// in-dialog requests serialize correctly.
161 pub fn session_handle(&self) -> CallSession {
162 CallSession {
163 endpoint: self.endpoint.clone(),
164 dialog: self.dialog.clone(),
165 peer: self.peer,
166 }
167 }
168
169 /// Opt in to handle this call's inbound in-dialog requests — the peer's
170 /// re-`INVITE`s (e.g. an RFC 4028 session refresh, or a peer-initiated
171 /// hold) and `INFO`s (e.g. SIP-INFO DTMF) — instead of having the endpoint
172 /// auto-answer them `200 OK`.
173 ///
174 /// Returns a stream; each [`InboundRequest`] must be answered (with
175 /// [`InboundRequest::respond`] / [`InboundRequest::ok`]). While the returned
176 /// [`InboundRequests`] is alive, those requests route here; drop it to
177 /// revert to auto-answering. `BYE` / `OPTIONS` are always auto-answered.
178 /// Call this once per [`Call`].
179 pub fn inbound_requests(&self) -> InboundRequests {
180 let rx = self.endpoint.register_dialog(self.dialog_id.clone());
181 InboundRequests {
182 endpoint: self.endpoint.clone(),
183 dialog_id: self.dialog_id.clone(),
184 rx,
185 }
186 }
187
188 /// A token that fires when the peer ends the call by sending an in-dialog
189 /// `BYE`. The endpoint auto-answers the BYE `200 OK`; this is purely the
190 /// notification. Clone it and `await` [`CancellationToken::cancelled`] in a
191 /// task to drive call teardown (stop audio, finalize a recording). It does
192 /// **not** fire for a local [`Call::hangup`] — the caller already knows.
193 pub fn terminated(&self) -> CancellationToken {
194 self.terminated.clone()
195 }
196
197 /// The peer's remote-target URI (its `Contact`) — the address a third party
198 /// should `INVITE` to reach this exact peer. Used as the `Refer-To` target
199 /// of an attended transfer (read off the *consultation* call), paired with
200 /// [`Call::dialog_triplet`] for the `Replaces`.
201 pub fn peer_uri(&self) -> &Uri {
202 &self.peer_uri
203 }
204
205 /// This call's dialog identity (Call-ID + our/remote tags), for naming it in
206 /// an attended transfer's `Replaces` (RFC 3891). Read it off the
207 /// *consultation* call (the leg we built to the transfer target) and pass it
208 /// to [`Call::attended_transfer`] on the call being transferred.
209 pub fn dialog_triplet(&self) -> crate::refer::DialogTriplet {
210 crate::refer::DialogTriplet {
211 call_id: self.dialog_id.call_id.clone(),
212 local_tag: self.dialog_id.local_tag.clone(),
213 remote_tag: self.dialog_id.remote_tag.clone(),
214 }
215 }
216
217 /// Send one DTMF press via SIP `INFO` (`application/dtmf-relay`).
218 ///
219 /// Use this only when the remote did not negotiate RFC 4733 — i.e.
220 /// [`RemoteMedia::dtmf_payload_type`] is `None`. When telephone-event is
221 /// available, prefer [`crate::send_dtmf_burst`] over RTP. A
222 /// [`InfoOutcome::UnsupportedMedia`] result means the remote rejects this
223 /// transport too; stop sending further presses on this dialog.
224 pub async fn send_dtmf_info(&mut self, digit: DtmfDigit, duration_ms: u32) -> InfoOutcome {
225 let body = build_info_body(digit, duration_ms).into_bytes();
226 let response = {
227 let mut dialog = self.dialog.lock().await;
228 self.endpoint
229 .ua()
230 .info(self.peer, &mut dialog, vec![content_type_header()], body)
231 .await
232 };
233 classify(response)
234 }
235
236 /// Blind-transfer the call: ask the peer to place a fresh call to `target`
237 /// by sending an in-dialog `REFER` with a `Refer-To` (RFC 3515).
238 ///
239 /// Returns `Ok(())` once the peer accepts the `REFER` with a 2xx
240 /// (`202 Accepted`) — at which point the transfer is *in progress*, not yet
241 /// complete. The peer then reports the outcome as a series of in-dialog
242 /// `NOTIFY`s (a `message/sipfrag` status line) that arrive on
243 /// [`Call::inbound_requests`]; the consumer watches those (parsing each with
244 /// [`crate::parse_sipfrag_status`]) and tears its own leg down once the
245 /// target answers. A non-2xx final to the `REFER` surfaces the peer's reason
246 /// and leaves the call unchanged — the peer won't honour the transfer, so
247 /// the consumer should keep the call up.
248 ///
249 /// This is *blind* (unattended) transfer: we do not first call `target`
250 /// ourselves. Attended transfer (consult `target`, then `REFER` with
251 /// `Replaces`) is a separate method, not yet implemented.
252 pub async fn blind_transfer(&mut self, target: Uri) -> Result<(), BoxError> {
253 let headers = vec![crate::refer::refer_to_header(&target)];
254 let response = {
255 let mut dialog = self.dialog.lock().await;
256 self.endpoint
257 .ua()
258 .refer(self.peer, &mut dialog, headers)
259 .await
260 };
261 match response {
262 Some(r) if (200..300).contains(&r.status_code.code()) => {
263 info!(%target, "blind transfer accepted (REFER 2xx); awaiting NOTIFY");
264 Ok(())
265 }
266 Some(r) => Err(format!("REFER rejected: {}", r.status_code).into()),
267 None => Err("REFER timed out with no final response".into()),
268 }
269 }
270
271 /// Attended-transfer the call: ask the peer (the party we hold) to take over
272 /// the consultation dialog named by `replaces` by sending an in-dialog
273 /// `REFER` whose `Refer-To` carries `target` plus a `Replaces` header
274 /// (RFC 3515 + RFC 3891).
275 ///
276 /// `replaces` is the dialog identity of the *consultation* call — the leg we
277 /// already established to `target` ourselves — read via
278 /// [`Call::dialog_triplet`]. When the peer accepts (`202`), it `INVITE`s
279 /// `target` with that `Replaces`, so `target` replaces the consultation leg
280 /// rather than ringing afresh. The outcome arrives exactly as for a blind
281 /// transfer — `NOTIFY`/sipfrag on [`Call::inbound_requests`] — so the
282 /// consumer drives both the same way. A non-2xx final to the `REFER`
283 /// surfaces the peer's reason and leaves the call unchanged.
284 pub async fn attended_transfer(
285 &mut self,
286 target: Uri,
287 replaces: &crate::refer::DialogTriplet,
288 ) -> Result<(), BoxError> {
289 let headers = vec![crate::refer::refer_to_with_replaces(&target, replaces)];
290 let response = {
291 let mut dialog = self.dialog.lock().await;
292 self.endpoint
293 .ua()
294 .refer(self.peer, &mut dialog, headers)
295 .await
296 };
297 match response {
298 Some(r) if (200..300).contains(&r.status_code.code()) => {
299 info!(%target, "attended transfer accepted (REFER 2xx); awaiting NOTIFY");
300 Ok(())
301 }
302 Some(r) => Err(format!("REFER rejected: {}", r.status_code).into()),
303 None => Err("REFER timed out with no final response".into()),
304 }
305 }
306
307 /// Hang up by sending an in-dialog `BYE`. Returns once the peer 2xxs it
308 /// (or the transaction gives up).
309 pub async fn hangup(&mut self) -> Result<(), BoxError> {
310 let acked = {
311 let mut dialog = self.dialog.lock().await;
312 self.endpoint.ua().hangup(self.peer, &mut dialog).await
313 };
314 if acked {
315 info!("call hung up (BYE acknowledged)");
316 Ok(())
317 } else {
318 Err("BYE was not acknowledged".into())
319 }
320 }
321}
322
323impl Drop for Call {
324 fn drop(&mut self) {
325 // Release the termination registration so the endpoint's table doesn't
326 // grow for the life of the process. (`InboundRequests` similarly
327 // unregisters the dialog on its own drop.)
328 self.endpoint.unregister_termination(&self.dialog_id);
329 }
330}
331
332/// A stream of a [`Call`]'s inbound in-dialog requests (peer re-`INVITE` /
333/// `INFO`), produced by [`Call::inbound_requests`].
334///
335/// Dropping it unregisters the dialog, so its inbound requests revert to being
336/// auto-answered `200 OK` by the endpoint.
337pub struct InboundRequests {
338 endpoint: Arc<SipEndpoint>,
339 dialog_id: DialogId,
340 rx: mpsc::Receiver<InboundRequest>,
341}
342
343impl InboundRequests {
344 /// Await the next inbound request, or `None` once the call's endpoint shuts
345 /// down or this stream is being torn down.
346 pub async fn recv(&mut self) -> Option<InboundRequest> {
347 self.rx.recv().await
348 }
349}
350
351impl Drop for InboundRequests {
352 fn drop(&mut self) {
353 self.endpoint.unregister_dialog(&self.dialog_id);
354 }
355}
356
357/// A cloneable session-control handle over a [`Call`]'s dialog.
358///
359/// Produced by [`Call::session_handle`] and consumed by
360/// [`crate::session_timer_loop`]: it implements [`SessionDialogOps`] so the
361/// loop can send refresh re-INVITEs and the tear-down BYE on the shared dialog.
362#[derive(Clone)]
363pub struct CallSession {
364 endpoint: Arc<SipEndpoint>,
365 dialog: Arc<Mutex<Dialog>>,
366 peer: SocketAddr,
367}
368
369impl SessionDialogOps for CallSession {
370 async fn refresh(
371 &self,
372 mut headers: Vec<Header>,
373 body: Option<Vec<u8>>,
374 ) -> Result<Option<rsip::Response>, BoxError> {
375 let body = body.unwrap_or_default();
376 if !body.is_empty() {
377 headers.push(Header::ContentType("application/sdp".into()));
378 }
379 let mut dialog = self.dialog.lock().await;
380 Ok(self
381 .endpoint
382 .ua()
383 .reinvite(self.peer, &mut dialog, headers, body)
384 .await)
385 }
386
387 async fn send_bye(&self) -> Result<(), BoxError> {
388 let mut dialog = self.dialog.lock().await;
389 if self.endpoint.ua().hangup(self.peer, &mut dialog).await {
390 Ok(())
391 } else {
392 Err("BYE was not acknowledged".into())
393 }
394 }
395}
396
397/// Stateless helper bound to an account + endpoint.
398pub struct Caller {
399 account: SipAccount,
400 endpoint: Arc<SipEndpoint>,
401}
402
403impl Caller {
404 /// Construct a `Caller` for the given account and shared endpoint.
405 pub fn new(account: SipAccount, endpoint: Arc<SipEndpoint>) -> Self {
406 Self { account, endpoint }
407 }
408
409 /// Place an outbound call to `target` and wait for it to be answered.
410 ///
411 /// Binds a local RTP socket, offers G.711 SDP, sends the INVITE to the
412 /// account's resolved server, follows provisional responses, and answers a
413 /// single `401`/`407` challenge. Returns the [`Call`] on a 2xx, or an error
414 /// if the call was rejected, timed out, or had no usable SDP answer.
415 pub async fn dial(&self, target: Uri) -> Result<Call, BoxError> {
416 self.dial_inner(target, &CancellationToken::new(), None)
417 .await
418 }
419
420 /// Like [`Caller::dial`], but `cancel` aborts a still-ringing call with a
421 /// `CANCEL` (RFC 3261 §9). Firing the token once a provisional has arrived
422 /// tears the pending INVITE down; the returned error then reflects the
423 /// `487 Request Terminated`. Use `cancel.is_cancelled()` to tell a
424 /// cancellation apart from a callee rejection.
425 pub async fn dial_cancellable(
426 &self,
427 target: Uri,
428 cancel: &CancellationToken,
429 ) -> Result<Call, BoxError> {
430 self.dial_inner(target, cancel, None).await
431 }
432
433 /// Like [`Caller::dial_cancellable`], and additionally forwards each
434 /// provisional response status (e.g. [`rsip::StatusCode::Ringing`]) to
435 /// `progress` as it arrives — for a "ringing" UI. The channel closes when
436 /// the call reaches a final response.
437 pub async fn dial_with_progress(
438 &self,
439 target: Uri,
440 cancel: &CancellationToken,
441 progress: mpsc::Sender<rsip::StatusCode>,
442 ) -> Result<Call, BoxError> {
443 self.dial_inner(target, cancel, Some(progress)).await
444 }
445
446 async fn dial_inner(
447 &self,
448 target: Uri,
449 cancel: &CancellationToken,
450 progress: Option<mpsc::Sender<rsip::StatusCode>>,
451 ) -> Result<Call, BoxError> {
452 let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
453 let local_rtp_addr = rtp_socket.local_addr()?;
454 let local_ip = self.endpoint.local_ip();
455 info!(%local_ip, rtp_port = local_rtp_addr.port(), "bound RTP socket for outbound dial");
456
457 let offer = build_sdp(local_ip, local_rtp_addr.port());
458 debug!("SDP offer:\n{}", String::from_utf8_lossy(&offer));
459
460 let from: Uri =
461 format!("sip:{}@{}", self.account.username, self.account.domain).try_into()?;
462 let contact: Uri = format!(
463 "sip:{}@{}",
464 self.account.username,
465 self.endpoint.local_addr()
466 )
467 .try_into()?;
468
469 // Advertise RFC 4028 session-timer support so the answerer can pin a
470 // refresh interval in its 2xx (negotiated below).
471 let cfg = CallConfig {
472 target,
473 from,
474 contact,
475 from_tag: gen_tag(),
476 call_id: format!("{}@wavekat.com", gen_tag()),
477 sdp: offer,
478 extra_headers: vec![
479 supported_timer_header(),
480 SessionExpires {
481 interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
482 refresher: None,
483 }
484 .header(),
485 ],
486 username: self.account.auth_username().to_string(),
487 password: self.account.password.clone(),
488 };
489
490 match self
491 .endpoint
492 .ua()
493 .call_cancellable(&cfg, self.endpoint.server(), 1, cancel, progress.as_ref())
494 .await
495 {
496 CallOutcome::Answered { dialog, response } => {
497 let remote_media = parse_sdp(&response.body)?;
498 let session_timer = negotiate_uac(&response.headers);
499 info!(
500 remote_addr = %remote_media.addr,
501 remote_port = remote_media.port,
502 payload_type = remote_media.payload_type,
503 ?session_timer,
504 "call answered; parsed SDP answer",
505 );
506 Ok(Call::new(
507 self.endpoint.clone(),
508 *dialog,
509 self.endpoint.server(),
510 session_timer,
511 remote_media,
512 Arc::new(rtp_socket),
513 local_rtp_addr,
514 ))
515 }
516 CallOutcome::Rejected(status) => Err(format!("call rejected: {status}").into()),
517 CallOutcome::Unauthorized => Err("call rejected: authentication failed".into()),
518 CallOutcome::TimedOut => Err("call timed out with no final response".into()),
519 CallOutcome::EngineStopped => Err("engine stopped".into()),
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use crate::account::Transport;
528
529 fn test_account() -> SipAccount {
530 SipAccount {
531 display_name: "Office".to_string(),
532 username: "1001".to_string(),
533 password: "secret".to_string(),
534 domain: "sip.example.com".to_string(),
535 auth_username: None,
536 server: Some("pbx.example.com".to_string()),
537 port: Some(5080),
538 transport: Transport::Udp,
539 }
540 }
541
542 #[test]
543 fn caller_holds_account_and_endpoint_inputs() {
544 // Construction is pure; the call path is covered by the stack's
545 // loopback tests (`stack::ua`). Here we just check `new` wiring.
546 let acct = test_account();
547 assert_eq!(acct.auth_username(), "1001");
548 }
549}