Skip to main content

wavekat_sip/
caller.rs

1//! Outbound calls and the established-call handle.
2//!
3//! [`Caller::dial`] binds a local RTP socket, builds the SDP offer, places the
4//! INVITE through the engine (answering a digest challenge if the server
5//! demands one), and on a 2xx returns a [`Call`] — the negotiated remote media
6//! plus the bound RTP socket. Audio device I/O, codecs and recording stay with
7//! the consumer; the `rtp_socket` + `remote_media` + `local_rtp_addr` triple is
8//! the raw plumbing.
9
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use rsip::{Header, Uri};
14use tokio::net::UdpSocket;
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, info};
18
19use crate::account::SipAccount;
20use crate::dtmf_info::{build_info_body, classify, content_type_header, InfoOutcome};
21use crate::endpoint::SipEndpoint;
22use crate::inbound::InboundRequest;
23use crate::rtp::dtmf::DtmfDigit;
24use crate::sdp::{build_sdp, build_sdp_with, parse_sdp, MediaDirection, RemoteMedia};
25use crate::session_timer::{
26    negotiate_uac, supported_timer_header, SessionDialogOps, SessionExpires, SessionTimer,
27    DEFAULT_SESSION_EXPIRES_SECS,
28};
29use crate::stack::call::{CallConfig, CallOutcome};
30use crate::stack::dialog::{Dialog, DialogId};
31use crate::stack::transaction::gen_tag;
32
33type BoxError = Box<dyn std::error::Error + Send + Sync>;
34
35/// An established call: negotiated remote media plus the local RTP socket.
36///
37/// The same handle is produced by [`Caller::dial`] (outbound) and
38/// [`crate::IncomingCall::accept`] (inbound), so call control is uniform.
39pub struct Call {
40    endpoint: Arc<SipEndpoint>,
41    /// Shared so a background session-timer loop ([`Call::session_handle`]) can
42    /// send refresh re-INVITEs / BYE while the call owner drives audio. The
43    /// mutex serializes the dialog's CSeq across both.
44    dialog: Arc<Mutex<Dialog>>,
45    /// This dialog's identity, used to register for inbound in-dialog requests
46    /// and termination.
47    dialog_id: DialogId,
48    /// Fired when the peer ends the call (an in-dialog `BYE`); surfaced via
49    /// [`Call::terminated`]. Registered with the endpoint at construction.
50    terminated: CancellationToken,
51    peer: SocketAddr,
52    /// `true` once we have put the peer on hold via a `sendonly` re-INVITE.
53    held: bool,
54    /// SDP `o=` version; bumped on every re-offer (RFC 3264 §5).
55    sdp_version: u32,
56    /// The RFC 4028 session timer negotiated at call setup, if any.
57    session_timer: Option<SessionTimer>,
58    /// Where the remote endpoint expects RTP (from the negotiated SDP).
59    pub remote_media: RemoteMedia,
60    /// Local RTP socket; share via `Arc` to send and receive concurrently.
61    pub rtp_socket: Arc<UdpSocket>,
62    /// Local RTP address advertised in our SDP.
63    pub local_rtp_addr: SocketAddr,
64}
65
66impl Call {
67    pub(crate) fn new(
68        endpoint: Arc<SipEndpoint>,
69        dialog: Dialog,
70        peer: SocketAddr,
71        session_timer: Option<SessionTimer>,
72        remote_media: RemoteMedia,
73        rtp_socket: Arc<UdpSocket>,
74        local_rtp_addr: SocketAddr,
75    ) -> Self {
76        let dialog_id = dialog.id();
77        // Register for the peer-BYE termination signal up front, so a remote
78        // hangup is observable via `Call::terminated` whether or not the call
79        // ever opts into `inbound_requests`.
80        let terminated = endpoint.register_termination(dialog_id.clone());
81        Self {
82            endpoint,
83            dialog: Arc::new(Mutex::new(dialog)),
84            dialog_id,
85            terminated,
86            peer,
87            held: false,
88            // The initial offer/answer was o= version 0.
89            sdp_version: 0,
90            session_timer,
91            remote_media,
92            rtp_socket,
93            local_rtp_addr,
94        }
95    }
96
97    /// Put the peer on hold (`on = true`, `a=sendonly`) or resume the call
98    /// (`on = false`, `a=sendrecv`) by sending an in-dialog re-INVITE with a
99    /// fresh SDP re-offer (RFC 3264 §8.4).
100    ///
101    /// The local hold state only flips once the peer accepts the re-INVITE with
102    /// a 2xx; a non-2xx final surfaces the server's reason and leaves the call
103    /// unchanged. The `o=` version is bumped for each re-offer regardless, as
104    /// RFC 3264 requires.
105    pub async fn set_hold(&mut self, on: bool) -> Result<(), BoxError> {
106        let direction = if on {
107            MediaDirection::SendOnly
108        } else {
109            MediaDirection::SendRecv
110        };
111        self.sdp_version += 1;
112        let offer = build_sdp_with(
113            self.endpoint.local_ip(),
114            self.local_rtp_addr.port(),
115            direction,
116            self.sdp_version,
117        );
118        let headers = vec![Header::ContentType("application/sdp".into())];
119        let response = {
120            let mut dialog = self.dialog.lock().await;
121            self.endpoint
122                .ua()
123                .reinvite(self.peer, &mut dialog, headers, offer)
124                .await
125        };
126        match response {
127            Some(r) if (200..300).contains(&r.status_code.code()) => {
128                self.held = on;
129                info!(on, "hold state updated via re-INVITE");
130                Ok(())
131            }
132            Some(r) => Err(format!("re-INVITE rejected: {}", r.status_code).into()),
133            None => Err("re-INVITE timed out with no final response".into()),
134        }
135    }
136
137    /// `true` if the call is currently on hold (we sent a `sendonly` re-INVITE
138    /// the peer accepted).
139    pub fn is_held(&self) -> bool {
140        self.held
141    }
142
143    /// The RFC 4028 session timer negotiated when the call was set up, or
144    /// `None` if neither side asked for one. Drive it with
145    /// [`crate::session_timer_loop`] against [`Call::session_handle`].
146    pub fn session_timer(&self) -> Option<SessionTimer> {
147        self.session_timer
148    }
149
150    /// A cloneable handle that sends refresh re-INVITEs / BYE on this call's
151    /// dialog, for running [`crate::session_timer_loop`] in a background task
152    /// alongside the audio path. Shares the dialog with the `Call`, so their
153    /// in-dialog requests serialize correctly.
154    pub fn session_handle(&self) -> CallSession {
155        CallSession {
156            endpoint: self.endpoint.clone(),
157            dialog: self.dialog.clone(),
158            peer: self.peer,
159        }
160    }
161
162    /// Opt in to handle this call's inbound in-dialog requests — the peer's
163    /// re-`INVITE`s (e.g. an RFC 4028 session refresh, or a peer-initiated
164    /// hold) and `INFO`s (e.g. SIP-INFO DTMF) — instead of having the endpoint
165    /// auto-answer them `200 OK`.
166    ///
167    /// Returns a stream; each [`InboundRequest`] must be answered (with
168    /// [`InboundRequest::respond`] / [`InboundRequest::ok`]). While the returned
169    /// [`InboundRequests`] is alive, those requests route here; drop it to
170    /// revert to auto-answering. `BYE` / `OPTIONS` are always auto-answered.
171    /// Call this once per [`Call`].
172    pub fn inbound_requests(&self) -> InboundRequests {
173        let rx = self.endpoint.register_dialog(self.dialog_id.clone());
174        InboundRequests {
175            endpoint: self.endpoint.clone(),
176            dialog_id: self.dialog_id.clone(),
177            rx,
178        }
179    }
180
181    /// A token that fires when the peer ends the call by sending an in-dialog
182    /// `BYE`. The endpoint auto-answers the BYE `200 OK`; this is purely the
183    /// notification. Clone it and `await` [`CancellationToken::cancelled`] in a
184    /// task to drive call teardown (stop audio, finalize a recording). It does
185    /// **not** fire for a local [`Call::hangup`] — the caller already knows.
186    pub fn terminated(&self) -> CancellationToken {
187        self.terminated.clone()
188    }
189
190    /// Send one DTMF press via SIP `INFO` (`application/dtmf-relay`).
191    ///
192    /// Use this only when the remote did not negotiate RFC 4733 — i.e.
193    /// [`RemoteMedia::dtmf_payload_type`] is `None`. When telephone-event is
194    /// available, prefer [`crate::send_dtmf_burst`] over RTP. A
195    /// [`InfoOutcome::UnsupportedMedia`] result means the remote rejects this
196    /// transport too; stop sending further presses on this dialog.
197    pub async fn send_dtmf_info(&mut self, digit: DtmfDigit, duration_ms: u32) -> InfoOutcome {
198        let body = build_info_body(digit, duration_ms).into_bytes();
199        let response = {
200            let mut dialog = self.dialog.lock().await;
201            self.endpoint
202                .ua()
203                .info(self.peer, &mut dialog, vec![content_type_header()], body)
204                .await
205        };
206        classify(response)
207    }
208
209    /// Blind-transfer the call: ask the peer to place a fresh call to `target`
210    /// by sending an in-dialog `REFER` with a `Refer-To` (RFC 3515).
211    ///
212    /// Returns `Ok(())` once the peer accepts the `REFER` with a 2xx
213    /// (`202 Accepted`) — at which point the transfer is *in progress*, not yet
214    /// complete. The peer then reports the outcome as a series of in-dialog
215    /// `NOTIFY`s (a `message/sipfrag` status line) that arrive on
216    /// [`Call::inbound_requests`]; the consumer watches those (parsing each with
217    /// [`crate::parse_sipfrag_status`]) and tears its own leg down once the
218    /// target answers. A non-2xx final to the `REFER` surfaces the peer's reason
219    /// and leaves the call unchanged — the peer won't honour the transfer, so
220    /// the consumer should keep the call up.
221    ///
222    /// This is *blind* (unattended) transfer: we do not first call `target`
223    /// ourselves. Attended transfer (consult `target`, then `REFER` with
224    /// `Replaces`) is a separate method, not yet implemented.
225    pub async fn blind_transfer(&mut self, target: Uri) -> Result<(), BoxError> {
226        let headers = vec![crate::refer::refer_to_header(&target)];
227        let response = {
228            let mut dialog = self.dialog.lock().await;
229            self.endpoint
230                .ua()
231                .refer(self.peer, &mut dialog, headers)
232                .await
233        };
234        match response {
235            Some(r) if (200..300).contains(&r.status_code.code()) => {
236                info!(%target, "blind transfer accepted (REFER 2xx); awaiting NOTIFY");
237                Ok(())
238            }
239            Some(r) => Err(format!("REFER rejected: {}", r.status_code).into()),
240            None => Err("REFER timed out with no final response".into()),
241        }
242    }
243
244    /// Hang up by sending an in-dialog `BYE`. Returns once the peer 2xxs it
245    /// (or the transaction gives up).
246    pub async fn hangup(&mut self) -> Result<(), BoxError> {
247        let acked = {
248            let mut dialog = self.dialog.lock().await;
249            self.endpoint.ua().hangup(self.peer, &mut dialog).await
250        };
251        if acked {
252            info!("call hung up (BYE acknowledged)");
253            Ok(())
254        } else {
255            Err("BYE was not acknowledged".into())
256        }
257    }
258}
259
260impl Drop for Call {
261    fn drop(&mut self) {
262        // Release the termination registration so the endpoint's table doesn't
263        // grow for the life of the process. (`InboundRequests` similarly
264        // unregisters the dialog on its own drop.)
265        self.endpoint.unregister_termination(&self.dialog_id);
266    }
267}
268
269/// A stream of a [`Call`]'s inbound in-dialog requests (peer re-`INVITE` /
270/// `INFO`), produced by [`Call::inbound_requests`].
271///
272/// Dropping it unregisters the dialog, so its inbound requests revert to being
273/// auto-answered `200 OK` by the endpoint.
274pub struct InboundRequests {
275    endpoint: Arc<SipEndpoint>,
276    dialog_id: DialogId,
277    rx: mpsc::Receiver<InboundRequest>,
278}
279
280impl InboundRequests {
281    /// Await the next inbound request, or `None` once the call's endpoint shuts
282    /// down or this stream is being torn down.
283    pub async fn recv(&mut self) -> Option<InboundRequest> {
284        self.rx.recv().await
285    }
286}
287
288impl Drop for InboundRequests {
289    fn drop(&mut self) {
290        self.endpoint.unregister_dialog(&self.dialog_id);
291    }
292}
293
294/// A cloneable session-control handle over a [`Call`]'s dialog.
295///
296/// Produced by [`Call::session_handle`] and consumed by
297/// [`crate::session_timer_loop`]: it implements [`SessionDialogOps`] so the
298/// loop can send refresh re-INVITEs and the tear-down BYE on the shared dialog.
299#[derive(Clone)]
300pub struct CallSession {
301    endpoint: Arc<SipEndpoint>,
302    dialog: Arc<Mutex<Dialog>>,
303    peer: SocketAddr,
304}
305
306impl SessionDialogOps for CallSession {
307    async fn refresh(
308        &self,
309        mut headers: Vec<Header>,
310        body: Option<Vec<u8>>,
311    ) -> Result<Option<rsip::Response>, BoxError> {
312        let body = body.unwrap_or_default();
313        if !body.is_empty() {
314            headers.push(Header::ContentType("application/sdp".into()));
315        }
316        let mut dialog = self.dialog.lock().await;
317        Ok(self
318            .endpoint
319            .ua()
320            .reinvite(self.peer, &mut dialog, headers, body)
321            .await)
322    }
323
324    async fn send_bye(&self) -> Result<(), BoxError> {
325        let mut dialog = self.dialog.lock().await;
326        if self.endpoint.ua().hangup(self.peer, &mut dialog).await {
327            Ok(())
328        } else {
329            Err("BYE was not acknowledged".into())
330        }
331    }
332}
333
334/// Stateless helper bound to an account + endpoint.
335pub struct Caller {
336    account: SipAccount,
337    endpoint: Arc<SipEndpoint>,
338}
339
340impl Caller {
341    /// Construct a `Caller` for the given account and shared endpoint.
342    pub fn new(account: SipAccount, endpoint: Arc<SipEndpoint>) -> Self {
343        Self { account, endpoint }
344    }
345
346    /// Place an outbound call to `target` and wait for it to be answered.
347    ///
348    /// Binds a local RTP socket, offers G.711 SDP, sends the INVITE to the
349    /// account's resolved server, follows provisional responses, and answers a
350    /// single `401`/`407` challenge. Returns the [`Call`] on a 2xx, or an error
351    /// if the call was rejected, timed out, or had no usable SDP answer.
352    pub async fn dial(&self, target: Uri) -> Result<Call, BoxError> {
353        self.dial_inner(target, &CancellationToken::new(), None)
354            .await
355    }
356
357    /// Like [`Caller::dial`], but `cancel` aborts a still-ringing call with a
358    /// `CANCEL` (RFC 3261 §9). Firing the token once a provisional has arrived
359    /// tears the pending INVITE down; the returned error then reflects the
360    /// `487 Request Terminated`. Use `cancel.is_cancelled()` to tell a
361    /// cancellation apart from a callee rejection.
362    pub async fn dial_cancellable(
363        &self,
364        target: Uri,
365        cancel: &CancellationToken,
366    ) -> Result<Call, BoxError> {
367        self.dial_inner(target, cancel, None).await
368    }
369
370    /// Like [`Caller::dial_cancellable`], and additionally forwards each
371    /// provisional response status (e.g. [`rsip::StatusCode::Ringing`]) to
372    /// `progress` as it arrives — for a "ringing" UI. The channel closes when
373    /// the call reaches a final response.
374    pub async fn dial_with_progress(
375        &self,
376        target: Uri,
377        cancel: &CancellationToken,
378        progress: mpsc::Sender<rsip::StatusCode>,
379    ) -> Result<Call, BoxError> {
380        self.dial_inner(target, cancel, Some(progress)).await
381    }
382
383    async fn dial_inner(
384        &self,
385        target: Uri,
386        cancel: &CancellationToken,
387        progress: Option<mpsc::Sender<rsip::StatusCode>>,
388    ) -> Result<Call, BoxError> {
389        let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
390        let local_rtp_addr = rtp_socket.local_addr()?;
391        let local_ip = self.endpoint.local_ip();
392        info!(%local_ip, rtp_port = local_rtp_addr.port(), "bound RTP socket for outbound dial");
393
394        let offer = build_sdp(local_ip, local_rtp_addr.port());
395        debug!("SDP offer:\n{}", String::from_utf8_lossy(&offer));
396
397        let from: Uri =
398            format!("sip:{}@{}", self.account.username, self.account.domain).try_into()?;
399        let contact: Uri = format!(
400            "sip:{}@{}",
401            self.account.username,
402            self.endpoint.local_addr()
403        )
404        .try_into()?;
405
406        // Advertise RFC 4028 session-timer support so the answerer can pin a
407        // refresh interval in its 2xx (negotiated below).
408        let cfg = CallConfig {
409            target,
410            from,
411            contact,
412            from_tag: gen_tag(),
413            call_id: format!("{}@wavekat.com", gen_tag()),
414            sdp: offer,
415            extra_headers: vec![
416                supported_timer_header(),
417                SessionExpires {
418                    interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
419                    refresher: None,
420                }
421                .header(),
422            ],
423            username: self.account.auth_username().to_string(),
424            password: self.account.password.clone(),
425        };
426
427        match self
428            .endpoint
429            .ua()
430            .call_cancellable(&cfg, self.endpoint.server(), 1, cancel, progress.as_ref())
431            .await
432        {
433            CallOutcome::Answered { dialog, response } => {
434                let remote_media = parse_sdp(&response.body)?;
435                let session_timer = negotiate_uac(&response.headers);
436                info!(
437                    remote_addr = %remote_media.addr,
438                    remote_port = remote_media.port,
439                    payload_type = remote_media.payload_type,
440                    ?session_timer,
441                    "call answered; parsed SDP answer",
442                );
443                Ok(Call::new(
444                    self.endpoint.clone(),
445                    *dialog,
446                    self.endpoint.server(),
447                    session_timer,
448                    remote_media,
449                    Arc::new(rtp_socket),
450                    local_rtp_addr,
451                ))
452            }
453            CallOutcome::Rejected(status) => Err(format!("call rejected: {status}").into()),
454            CallOutcome::Unauthorized => Err("call rejected: authentication failed".into()),
455            CallOutcome::TimedOut => Err("call timed out with no final response".into()),
456            CallOutcome::EngineStopped => Err("engine stopped".into()),
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use crate::account::Transport;
465
466    fn test_account() -> SipAccount {
467        SipAccount {
468            display_name: "Office".to_string(),
469            username: "1001".to_string(),
470            password: "secret".to_string(),
471            domain: "sip.example.com".to_string(),
472            auth_username: None,
473            server: Some("pbx.example.com".to_string()),
474            port: Some(5080),
475            transport: Transport::Udp,
476        }
477    }
478
479    #[test]
480    fn caller_holds_account_and_endpoint_inputs() {
481        // Construction is pure; the call path is covered by the stack's
482        // loopback tests (`stack::ua`). Here we just check `new` wiring.
483        let acct = test_account();
484        assert_eq!(acct.auth_username(), "1001");
485    }
486}