simple_sip_rs/call/
mod.rs

1pub mod incoming_call;
2pub mod outgoing_call;
3mod call_handler;
4mod session_parameters;
5mod rtp_session;
6
7use std::cmp::PartialEq;
8use anyhow::{Context, Result};
9use futures_util::future::Either;
10use rsip::Uri;
11use log::debug;
12use tokio::task::JoinHandle;
13
14use crate::call::session_parameters::SessionParameters;
15use crate::call::call_handler::call_task;
16use crate::call::rtp_session::rtp_task;
17use crate::connection::call_connection::CallConnection;
18use crate::media::telephone_events::TelephoneEvent;
19use crate::utils::{create_mpsc_bidirectional_unbounded, BidirectionalChannel};
20
21#[derive(Debug)]
22pub enum Media {
23    Audio(Vec<f32>),
24    TelephoneEvent((TelephoneEvent, bool)),
25    OutputEmpty,
26}
27
28#[derive(Copy, Clone, Debug, PartialEq)]
29pub enum CallControl {
30    Hangup,
31    AudioOutEmpty,
32    Finished,
33}
34
35/// Represents an ongoing (as been answered) call.
36pub struct Call {
37    call_handle: JoinHandle<Result<()>>,
38    rtp_handle: JoinHandle<Result<()>>,
39    remote_uri: Uri,
40
41    call_channel: BidirectionalChannel<CallControl>,
42    media_channel: BidirectionalChannel<Media>,
43}
44
45impl Call {
46    async fn new(call_connection: CallConnection, call_session_params: SessionParameters) -> Result<Self>
47    {
48        let (call_channel_local, call_channel_remote) = create_mpsc_bidirectional_unbounded();
49        let (media_channel_local, media_channel_remote) = create_mpsc_bidirectional_unbounded();
50
51        let remote_uri = call_session_params.remote.uri.clone();
52
53        let cloned_call_session_params = call_session_params.clone();
54        let call_handle = tokio::task::spawn(async move {
55            let res = call_task(
56                call_channel_remote,
57                call_connection,
58                cloned_call_session_params
59            ).await;
60            debug!("Call task finished with {:?}", res);
61            res
62        });
63
64        let rtp_handle = tokio::task::spawn(async move {
65            let res = rtp_task(media_channel_remote, call_session_params).await;
66            debug!("RTP task finished with {:?}", res);
67            res
68        });
69
70        Ok(Call {
71            call_handle,
72            rtp_handle,
73            remote_uri,
74            call_channel: call_channel_local,
75            media_channel: media_channel_local,
76        })
77    }
78
79    /// Blocks until the call has finished (hang up and terminated the worker thread)
80    pub async fn block_for_finished(&mut self) {
81        loop {
82            match self.call_channel.recv().await {
83                None => (),
84                Some(control) => {
85                    if control == CallControl::Finished {
86                        return;
87                    }
88                }
89            }
90        }
91    }
92
93    /// Blocks until the output buffer is empty
94    ///
95    /// This is typically useful when sending already recorded sound,
96    /// and you want to make sure the playback is finished before proceeding.
97    pub async fn block_for_output_empty(&mut self) {
98        loop {
99            tokio::select! {
100                call_message = self.call_channel.receiver.recv() => {
101                    if let Some(control) = call_message {
102                        if control == CallControl::Finished {
103                            return;
104                        }
105                    }
106                    return;
107                }
108                media = self.media_channel.receiver.recv() => {
109                    if let Some(media) = media {
110                        if let Media::OutputEmpty = media {
111                            return;
112                        }
113                    }
114                }
115            }
116        }
117    }
118
119    /// Adds the given samples to the output audio buffer.
120    ///
121    /// # Arguments
122    ///
123    /// * `audio`: Interleaved stereo `f32` samples @ 48000Hz.
124    ///
125    /// # Errors
126    /// Errors when failing to send the audio to the call. Most likely because the call has already ended.
127    pub fn send_audio(&self, audio: Vec<f32>) -> Result<()>
128    {
129        self.media_channel.sender.send(Media::Audio(audio)).context("Failed to send audio to call. Call might be over.")
130    }
131
132    /// Tries to hang up the call. Might fail if the call is already over.
133    pub fn hangup(&self) -> Result<()>
134    {
135        self.call_channel.sender.send(CallControl::Hangup).context("Failed to send hangup to call. Call might be over.")
136    }
137
138    /// Receive the next control message from the call. Blocking until a message arrives.
139    pub async fn recv(&mut self) -> Option<CallControl>
140    {
141        self.call_channel.receiver.recv().await
142    }
143
144    /// Receive the next media message from the call. Blocking until a message arrives.
145    pub async fn recv_media(&mut self) -> Option<Media> {
146        self.media_channel.receiver.recv().await
147    }
148
149    /// Receive either the next control message or the next media message.
150    pub async fn recv_either(&mut self) -> Either<Option<CallControl>, Option<Media>> {
151        tokio::select! {
152            message = self.call_channel.receiver.recv() => {
153                Either::Left(message)
154            }
155            media = self.media_channel.receiver.recv() => {
156                Either::Right(media)
157            }
158        }
159
160    }
161
162    /// Returns the remote URI
163    pub fn get_remote_uri(&self) -> &String
164    {
165        &self.remote_uri.auth.as_ref().unwrap().user
166    }
167
168    /// Returns the state of the underlying worker
169    ///
170    /// `true` if the underlying worker as finished.
171    pub fn is_finished(&self) -> bool {
172        self.call_handle.is_finished() || self.rtp_handle.is_finished() || self.call_channel.one_sided() || self.media_channel.one_sided()
173    }
174}
175
176impl Drop for Call {
177    fn drop(&mut self) {
178        if !self.call_handle.is_finished() {
179            self.call_handle.abort();
180        }
181        if !self.rtp_handle.is_finished() {
182            self.rtp_handle.abort();
183        }
184    }
185}