Skip to main content

wavekat_sip/
caller.rs

1//! Outbound calls and the established-call handle.
2//!
3//! [`Caller::dial`] binds a local RTP socket, builds the SDP offer, places the
4//! INVITE through the engine (answering a digest challenge if the server
5//! demands one), and on a 2xx returns a [`Call`] — the negotiated remote media
6//! plus the bound RTP socket. Audio device I/O, codecs and recording stay with
7//! the consumer; the `rtp_socket` + `remote_media` + `local_rtp_addr` triple is
8//! the raw plumbing.
9
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use rsip::{Header, Uri};
14use tokio::net::UdpSocket;
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, info};
18
19use crate::account::SipAccount;
20use crate::dtmf_info::{build_info_body, classify, content_type_header, InfoOutcome};
21use crate::endpoint::SipEndpoint;
22use crate::inbound::InboundRequest;
23use crate::rtp::dtmf::DtmfDigit;
24use crate::sdp::{build_sdp, build_sdp_with, parse_sdp, MediaDirection, RemoteMedia};
25use crate::session_timer::{
26    negotiate_uac, supported_timer_header, SessionDialogOps, SessionExpires, SessionTimer,
27    DEFAULT_SESSION_EXPIRES_SECS,
28};
29use crate::stack::call::{CallConfig, CallOutcome};
30use crate::stack::dialog::{Dialog, DialogId};
31use crate::stack::transaction::gen_tag;
32
33type BoxError = Box<dyn std::error::Error + Send + Sync>;
34
35/// An established call: negotiated remote media plus the local RTP socket.
36///
37/// The same handle is produced by [`Caller::dial`] (outbound) and
38/// [`crate::IncomingCall::accept`] (inbound), so call control is uniform.
39pub struct Call {
40    endpoint: Arc<SipEndpoint>,
41    /// Shared so a background session-timer loop ([`Call::session_handle`]) can
42    /// send refresh re-INVITEs / BYE while the call owner drives audio. The
43    /// mutex serializes the dialog's CSeq across both.
44    dialog: Arc<Mutex<Dialog>>,
45    /// This dialog's identity, used to register for inbound in-dialog requests
46    /// and termination.
47    dialog_id: DialogId,
48    /// The peer's remote target (its `Contact`), captured at establishment. The
49    /// URI a third party should `INVITE` to reach this exact peer — used as the
50    /// `Refer-To` target of an attended transfer (RFC 5589 §7), paired with the
51    /// dialog's `Replaces`.
52    peer_uri: Uri,
53    /// Fired when the peer ends the call (an in-dialog `BYE`); surfaced via
54    /// [`Call::terminated`]. Registered with the endpoint at construction.
55    terminated: CancellationToken,
56    peer: SocketAddr,
57    /// `true` once we have put the peer on hold via a `sendonly` re-INVITE.
58    held: bool,
59    /// SDP `o=` version; bumped on every re-offer (RFC 3264 §5).
60    sdp_version: u32,
61    /// The RFC 4028 session timer negotiated at call setup, if any.
62    session_timer: Option<SessionTimer>,
63    /// Where the remote endpoint expects RTP (from the negotiated SDP).
64    pub remote_media: RemoteMedia,
65    /// Local RTP socket; share via `Arc` to send and receive concurrently.
66    pub rtp_socket: Arc<UdpSocket>,
67    /// Local RTP address advertised in our SDP.
68    pub local_rtp_addr: SocketAddr,
69}
70
71impl Call {
72    pub(crate) fn new(
73        endpoint: Arc<SipEndpoint>,
74        dialog: Dialog,
75        peer: SocketAddr,
76        session_timer: Option<SessionTimer>,
77        remote_media: RemoteMedia,
78        rtp_socket: Arc<UdpSocket>,
79        local_rtp_addr: SocketAddr,
80    ) -> Self {
81        let dialog_id = dialog.id();
82        let peer_uri = dialog.remote_target().clone();
83        // Register for the peer-BYE termination signal up front, so a remote
84        // hangup is observable via `Call::terminated` whether or not the call
85        // ever opts into `inbound_requests`.
86        let terminated = endpoint.register_termination(dialog_id.clone());
87        Self {
88            endpoint,
89            dialog: Arc::new(Mutex::new(dialog)),
90            dialog_id,
91            peer_uri,
92            terminated,
93            peer,
94            held: false,
95            // The initial offer/answer was o= version 0.
96            sdp_version: 0,
97            session_timer,
98            remote_media,
99            rtp_socket,
100            local_rtp_addr,
101        }
102    }
103
104    /// Put the peer on hold (`on = true`, `a=sendonly`) or resume the call
105    /// (`on = false`, `a=sendrecv`) by sending an in-dialog re-INVITE with a
106    /// fresh SDP re-offer (RFC 3264 §8.4).
107    ///
108    /// The local hold state only flips once the peer accepts the re-INVITE with
109    /// a 2xx; a non-2xx final surfaces the server's reason and leaves the call
110    /// unchanged. The `o=` version is bumped for each re-offer regardless, as
111    /// RFC 3264 requires.
112    pub async fn set_hold(&mut self, on: bool) -> Result<(), BoxError> {
113        let direction = if on {
114            MediaDirection::SendOnly
115        } else {
116            MediaDirection::SendRecv
117        };
118        self.sdp_version += 1;
119        let offer = build_sdp_with(
120            self.endpoint.local_ip(),
121            self.local_rtp_addr.port(),
122            direction,
123            self.sdp_version,
124        );
125        let headers = vec![Header::ContentType("application/sdp".into())];
126        let response = {
127            let mut dialog = self.dialog.lock().await;
128            self.endpoint
129                .ua()
130                .reinvite(self.peer, &mut dialog, headers, offer)
131                .await
132        };
133        match response {
134            Some(r) if (200..300).contains(&r.status_code.code()) => {
135                self.held = on;
136                info!(on, "hold state updated via re-INVITE");
137                Ok(())
138            }
139            Some(r) => Err(format!("re-INVITE rejected: {}", r.status_code).into()),
140            None => Err("re-INVITE timed out with no final response".into()),
141        }
142    }
143
144    /// `true` if the call is currently on hold (we sent a `sendonly` re-INVITE
145    /// the peer accepted).
146    pub fn is_held(&self) -> bool {
147        self.held
148    }
149
150    /// The RFC 4028 session timer negotiated when the call was set up, or
151    /// `None` if neither side asked for one. Drive it with
152    /// [`crate::session_timer_loop`] against [`Call::session_handle`].
153    pub fn session_timer(&self) -> Option<SessionTimer> {
154        self.session_timer
155    }
156
157    /// A cloneable handle that sends refresh re-INVITEs / BYE on this call's
158    /// dialog, for running [`crate::session_timer_loop`] in a background task
159    /// alongside the audio path. Shares the dialog with the `Call`, so their
160    /// in-dialog requests serialize correctly.
161    pub fn session_handle(&self) -> CallSession {
162        CallSession {
163            endpoint: self.endpoint.clone(),
164            dialog: self.dialog.clone(),
165            peer: self.peer,
166        }
167    }
168
169    /// Opt in to handle this call's inbound in-dialog requests — the peer's
170    /// re-`INVITE`s (e.g. an RFC 4028 session refresh, or a peer-initiated
171    /// hold) and `INFO`s (e.g. SIP-INFO DTMF) — instead of having the endpoint
172    /// auto-answer them `200 OK`.
173    ///
174    /// Returns a stream; each [`InboundRequest`] must be answered (with
175    /// [`InboundRequest::respond`] / [`InboundRequest::ok`]). While the returned
176    /// [`InboundRequests`] is alive, those requests route here; drop it to
177    /// revert to auto-answering. `BYE` / `OPTIONS` are always auto-answered.
178    /// Call this once per [`Call`].
179    pub fn inbound_requests(&self) -> InboundRequests {
180        let rx = self.endpoint.register_dialog(self.dialog_id.clone());
181        InboundRequests {
182            endpoint: self.endpoint.clone(),
183            dialog_id: self.dialog_id.clone(),
184            rx,
185        }
186    }
187
188    /// A token that fires when the peer ends the call by sending an in-dialog
189    /// `BYE`. The endpoint auto-answers the BYE `200 OK`; this is purely the
190    /// notification. Clone it and `await` [`CancellationToken::cancelled`] in a
191    /// task to drive call teardown (stop audio, finalize a recording). It does
192    /// **not** fire for a local [`Call::hangup`] — the caller already knows.
193    pub fn terminated(&self) -> CancellationToken {
194        self.terminated.clone()
195    }
196
197    /// The peer's remote-target URI (its `Contact`) — the address a third party
198    /// should `INVITE` to reach this exact peer. Used as the `Refer-To` target
199    /// of an attended transfer (read off the *consultation* call), paired with
200    /// [`Call::dialog_triplet`] for the `Replaces`.
201    pub fn peer_uri(&self) -> &Uri {
202        &self.peer_uri
203    }
204
205    /// This call's dialog identity (Call-ID + our/remote tags), for naming it in
206    /// an attended transfer's `Replaces` (RFC 3891). Read it off the
207    /// *consultation* call (the leg we built to the transfer target) and pass it
208    /// to [`Call::attended_transfer`] on the call being transferred.
209    pub fn dialog_triplet(&self) -> crate::refer::DialogTriplet {
210        crate::refer::DialogTriplet {
211            call_id: self.dialog_id.call_id.clone(),
212            local_tag: self.dialog_id.local_tag.clone(),
213            remote_tag: self.dialog_id.remote_tag.clone(),
214        }
215    }
216
217    /// Send one DTMF press via SIP `INFO` (`application/dtmf-relay`).
218    ///
219    /// Use this only when the remote did not negotiate RFC 4733 — i.e.
220    /// [`RemoteMedia::dtmf_payload_type`] is `None`. When telephone-event is
221    /// available, prefer [`crate::send_dtmf_burst`] over RTP. A
222    /// [`InfoOutcome::UnsupportedMedia`] result means the remote rejects this
223    /// transport too; stop sending further presses on this dialog.
224    pub async fn send_dtmf_info(&mut self, digit: DtmfDigit, duration_ms: u32) -> InfoOutcome {
225        let body = build_info_body(digit, duration_ms).into_bytes();
226        let response = {
227            let mut dialog = self.dialog.lock().await;
228            self.endpoint
229                .ua()
230                .info(self.peer, &mut dialog, vec![content_type_header()], body)
231                .await
232        };
233        classify(response)
234    }
235
236    /// Blind-transfer the call: ask the peer to place a fresh call to `target`
237    /// by sending an in-dialog `REFER` with a `Refer-To` (RFC 3515).
238    ///
239    /// Returns `Ok(())` once the peer accepts the `REFER` with a 2xx
240    /// (`202 Accepted`) — at which point the transfer is *in progress*, not yet
241    /// complete. The peer then reports the outcome as a series of in-dialog
242    /// `NOTIFY`s (a `message/sipfrag` status line) that arrive on
243    /// [`Call::inbound_requests`]; the consumer watches those (parsing each with
244    /// [`crate::parse_sipfrag_status`]) and tears its own leg down once the
245    /// target answers. A non-2xx final to the `REFER` surfaces the peer's reason
246    /// and leaves the call unchanged — the peer won't honour the transfer, so
247    /// the consumer should keep the call up.
248    ///
249    /// This is *blind* (unattended) transfer: we do not first call `target`
250    /// ourselves. Attended transfer (consult `target`, then `REFER` with
251    /// `Replaces`) is a separate method, not yet implemented.
252    pub async fn blind_transfer(&mut self, target: Uri) -> Result<(), BoxError> {
253        let headers = vec![crate::refer::refer_to_header(&target)];
254        let response = {
255            let mut dialog = self.dialog.lock().await;
256            self.endpoint
257                .ua()
258                .refer(self.peer, &mut dialog, headers)
259                .await
260        };
261        match response {
262            Some(r) if (200..300).contains(&r.status_code.code()) => {
263                info!(%target, "blind transfer accepted (REFER 2xx); awaiting NOTIFY");
264                Ok(())
265            }
266            Some(r) => Err(format!("REFER rejected: {}", r.status_code).into()),
267            None => Err("REFER timed out with no final response".into()),
268        }
269    }
270
271    /// Attended-transfer the call: ask the peer (the party we hold) to take over
272    /// the consultation dialog named by `replaces` by sending an in-dialog
273    /// `REFER` whose `Refer-To` carries `target` plus a `Replaces` header
274    /// (RFC 3515 + RFC 3891).
275    ///
276    /// `replaces` is the dialog identity of the *consultation* call — the leg we
277    /// already established to `target` ourselves — read via
278    /// [`Call::dialog_triplet`]. When the peer accepts (`202`), it `INVITE`s
279    /// `target` with that `Replaces`, so `target` replaces the consultation leg
280    /// rather than ringing afresh. The outcome arrives exactly as for a blind
281    /// transfer — `NOTIFY`/sipfrag on [`Call::inbound_requests`] — so the
282    /// consumer drives both the same way. A non-2xx final to the `REFER`
283    /// surfaces the peer's reason and leaves the call unchanged.
284    pub async fn attended_transfer(
285        &mut self,
286        target: Uri,
287        replaces: &crate::refer::DialogTriplet,
288    ) -> Result<(), BoxError> {
289        let headers = vec![crate::refer::refer_to_with_replaces(&target, replaces)];
290        let response = {
291            let mut dialog = self.dialog.lock().await;
292            self.endpoint
293                .ua()
294                .refer(self.peer, &mut dialog, headers)
295                .await
296        };
297        match response {
298            Some(r) if (200..300).contains(&r.status_code.code()) => {
299                info!(%target, "attended transfer accepted (REFER 2xx); awaiting NOTIFY");
300                Ok(())
301            }
302            Some(r) => Err(format!("REFER rejected: {}", r.status_code).into()),
303            None => Err("REFER timed out with no final response".into()),
304        }
305    }
306
307    /// Hang up by sending an in-dialog `BYE`. Returns once the peer 2xxs it
308    /// (or the transaction gives up).
309    pub async fn hangup(&mut self) -> Result<(), BoxError> {
310        let acked = {
311            let mut dialog = self.dialog.lock().await;
312            self.endpoint.ua().hangup(self.peer, &mut dialog).await
313        };
314        if acked {
315            info!("call hung up (BYE acknowledged)");
316            Ok(())
317        } else {
318            Err("BYE was not acknowledged".into())
319        }
320    }
321}
322
323impl Drop for Call {
324    fn drop(&mut self) {
325        // Release the termination registration so the endpoint's table doesn't
326        // grow for the life of the process. (`InboundRequests` similarly
327        // unregisters the dialog on its own drop.)
328        self.endpoint.unregister_termination(&self.dialog_id);
329    }
330}
331
332/// A stream of a [`Call`]'s inbound in-dialog requests (peer re-`INVITE` /
333/// `INFO`), produced by [`Call::inbound_requests`].
334///
335/// Dropping it unregisters the dialog, so its inbound requests revert to being
336/// auto-answered `200 OK` by the endpoint.
337pub struct InboundRequests {
338    endpoint: Arc<SipEndpoint>,
339    dialog_id: DialogId,
340    rx: mpsc::Receiver<InboundRequest>,
341}
342
343impl InboundRequests {
344    /// Await the next inbound request, or `None` once the call's endpoint shuts
345    /// down or this stream is being torn down.
346    pub async fn recv(&mut self) -> Option<InboundRequest> {
347        self.rx.recv().await
348    }
349}
350
351impl Drop for InboundRequests {
352    fn drop(&mut self) {
353        self.endpoint.unregister_dialog(&self.dialog_id);
354    }
355}
356
357/// A cloneable session-control handle over a [`Call`]'s dialog.
358///
359/// Produced by [`Call::session_handle`] and consumed by
360/// [`crate::session_timer_loop`]: it implements [`SessionDialogOps`] so the
361/// loop can send refresh re-INVITEs and the tear-down BYE on the shared dialog.
362#[derive(Clone)]
363pub struct CallSession {
364    endpoint: Arc<SipEndpoint>,
365    dialog: Arc<Mutex<Dialog>>,
366    peer: SocketAddr,
367}
368
369impl SessionDialogOps for CallSession {
370    async fn refresh(
371        &self,
372        mut headers: Vec<Header>,
373        body: Option<Vec<u8>>,
374    ) -> Result<Option<rsip::Response>, BoxError> {
375        let body = body.unwrap_or_default();
376        if !body.is_empty() {
377            headers.push(Header::ContentType("application/sdp".into()));
378        }
379        let mut dialog = self.dialog.lock().await;
380        Ok(self
381            .endpoint
382            .ua()
383            .reinvite(self.peer, &mut dialog, headers, body)
384            .await)
385    }
386
387    async fn send_bye(&self) -> Result<(), BoxError> {
388        let mut dialog = self.dialog.lock().await;
389        if self.endpoint.ua().hangup(self.peer, &mut dialog).await {
390            Ok(())
391        } else {
392            Err("BYE was not acknowledged".into())
393        }
394    }
395}
396
397/// Stateless helper bound to an account + endpoint.
398pub struct Caller {
399    account: SipAccount,
400    endpoint: Arc<SipEndpoint>,
401}
402
403impl Caller {
404    /// Construct a `Caller` for the given account and shared endpoint.
405    pub fn new(account: SipAccount, endpoint: Arc<SipEndpoint>) -> Self {
406        Self { account, endpoint }
407    }
408
409    /// Place an outbound call to `target` and wait for it to be answered.
410    ///
411    /// Binds a local RTP socket, offers G.711 SDP, sends the INVITE to the
412    /// account's resolved server, follows provisional responses, and answers a
413    /// single `401`/`407` challenge. Returns the [`Call`] on a 2xx, or an error
414    /// if the call was rejected, timed out, or had no usable SDP answer.
415    pub async fn dial(&self, target: Uri) -> Result<Call, BoxError> {
416        self.dial_inner(target, &CancellationToken::new(), None)
417            .await
418    }
419
420    /// Like [`Caller::dial`], but `cancel` aborts a still-ringing call with a
421    /// `CANCEL` (RFC 3261 §9). Firing the token once a provisional has arrived
422    /// tears the pending INVITE down; the returned error then reflects the
423    /// `487 Request Terminated`. Use `cancel.is_cancelled()` to tell a
424    /// cancellation apart from a callee rejection.
425    pub async fn dial_cancellable(
426        &self,
427        target: Uri,
428        cancel: &CancellationToken,
429    ) -> Result<Call, BoxError> {
430        self.dial_inner(target, cancel, None).await
431    }
432
433    /// Like [`Caller::dial_cancellable`], and additionally forwards each
434    /// provisional response status (e.g. [`rsip::StatusCode::Ringing`]) to
435    /// `progress` as it arrives — for a "ringing" UI. The channel closes when
436    /// the call reaches a final response.
437    pub async fn dial_with_progress(
438        &self,
439        target: Uri,
440        cancel: &CancellationToken,
441        progress: mpsc::Sender<rsip::StatusCode>,
442    ) -> Result<Call, BoxError> {
443        self.dial_inner(target, cancel, Some(progress)).await
444    }
445
446    async fn dial_inner(
447        &self,
448        target: Uri,
449        cancel: &CancellationToken,
450        progress: Option<mpsc::Sender<rsip::StatusCode>>,
451    ) -> Result<Call, BoxError> {
452        let rtp_socket = UdpSocket::bind("0.0.0.0:0").await?;
453        let local_rtp_addr = rtp_socket.local_addr()?;
454        let local_ip = self.endpoint.local_ip();
455        info!(%local_ip, rtp_port = local_rtp_addr.port(), "bound RTP socket for outbound dial");
456
457        let offer = build_sdp(local_ip, local_rtp_addr.port());
458        debug!("SDP offer:\n{}", String::from_utf8_lossy(&offer));
459
460        let from: Uri =
461            format!("sip:{}@{}", self.account.username, self.account.domain).try_into()?;
462        let contact: Uri = format!(
463            "sip:{}@{}",
464            self.account.username,
465            self.endpoint.local_addr()
466        )
467        .try_into()?;
468
469        // Advertise RFC 4028 session-timer support so the answerer can pin a
470        // refresh interval in its 2xx (negotiated below).
471        let cfg = CallConfig {
472            target,
473            from,
474            contact,
475            from_tag: gen_tag(),
476            call_id: format!("{}@wavekat.com", gen_tag()),
477            sdp: offer,
478            extra_headers: vec![
479                supported_timer_header(),
480                SessionExpires {
481                    interval_secs: DEFAULT_SESSION_EXPIRES_SECS,
482                    refresher: None,
483                }
484                .header(),
485            ],
486            username: self.account.auth_username().to_string(),
487            password: self.account.password.clone(),
488        };
489
490        match self
491            .endpoint
492            .ua()
493            .call_cancellable(&cfg, self.endpoint.server(), 1, cancel, progress.as_ref())
494            .await
495        {
496            CallOutcome::Answered { dialog, response } => {
497                let remote_media = parse_sdp(&response.body)?;
498                let session_timer = negotiate_uac(&response.headers);
499                info!(
500                    remote_addr = %remote_media.addr,
501                    remote_port = remote_media.port,
502                    payload_type = remote_media.payload_type,
503                    ?session_timer,
504                    "call answered; parsed SDP answer",
505                );
506                Ok(Call::new(
507                    self.endpoint.clone(),
508                    *dialog,
509                    self.endpoint.server(),
510                    session_timer,
511                    remote_media,
512                    Arc::new(rtp_socket),
513                    local_rtp_addr,
514                ))
515            }
516            CallOutcome::Rejected(status) => Err(format!("call rejected: {status}").into()),
517            CallOutcome::Unauthorized => Err("call rejected: authentication failed".into()),
518            CallOutcome::TimedOut => Err("call timed out with no final response".into()),
519            CallOutcome::EngineStopped => Err("engine stopped".into()),
520        }
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use crate::account::Transport;
528
529    fn test_account() -> SipAccount {
530        SipAccount {
531            display_name: "Office".to_string(),
532            username: "1001".to_string(),
533            password: "secret".to_string(),
534            domain: "sip.example.com".to_string(),
535            auth_username: None,
536            server: Some("pbx.example.com".to_string()),
537            port: Some(5080),
538            transport: Transport::Udp,
539        }
540    }
541
542    #[test]
543    fn caller_holds_account_and_endpoint_inputs() {
544        // Construction is pure; the call path is covered by the stack's
545        // loopback tests (`stack::ua`). Here we just check `new` wiring.
546        let acct = test_account();
547        assert_eq!(acct.auth_username(), "1001");
548    }
549}