simple_sip_rs/call/
mod.rs1pub mod incoming_call;
2pub mod outgoing_call;
3mod call_handler;
4mod session_parameters;
5mod rtp_session;
6
7use std::cmp::PartialEq;
8use anyhow::{Context, Result};
9use futures_util::future::Either;
10use rsip::Uri;
11use log::debug;
12use tokio::task::JoinHandle;
13
14use crate::call::session_parameters::SessionParameters;
15use crate::call::call_handler::call_task;
16use crate::call::rtp_session::rtp_task;
17use crate::connection::call_connection::CallConnection;
18use crate::media::telephone_events::TelephoneEvent;
19use crate::utils::{create_mpsc_bidirectional_unbounded, BidirectionalChannel};
20
21#[derive(Debug)]
22pub enum Media {
23 Audio(Vec<f32>),
24 TelephoneEvent((TelephoneEvent, bool)),
25 OutputEmpty,
26}
27
28#[derive(Copy, Clone, Debug, PartialEq)]
29pub enum CallControl {
30 Hangup,
31 AudioOutEmpty,
32 Finished,
33}
34
35pub struct Call {
37 call_handle: JoinHandle<Result<()>>,
38 rtp_handle: JoinHandle<Result<()>>,
39 remote_uri: Uri,
40
41 call_channel: BidirectionalChannel<CallControl>,
42 media_channel: BidirectionalChannel<Media>,
43}
44
45impl Call {
46 async fn new(call_connection: CallConnection, call_session_params: SessionParameters) -> Result<Self>
47 {
48 let (call_channel_local, call_channel_remote) = create_mpsc_bidirectional_unbounded();
49 let (media_channel_local, media_channel_remote) = create_mpsc_bidirectional_unbounded();
50
51 let remote_uri = call_session_params.remote.uri.clone();
52
53 let cloned_call_session_params = call_session_params.clone();
54 let call_handle = tokio::task::spawn(async move {
55 let res = call_task(
56 call_channel_remote,
57 call_connection,
58 cloned_call_session_params
59 ).await;
60 debug!("Call task finished with {:?}", res);
61 res
62 });
63
64 let rtp_handle = tokio::task::spawn(async move {
65 let res = rtp_task(media_channel_remote, call_session_params).await;
66 debug!("RTP task finished with {:?}", res);
67 res
68 });
69
70 Ok(Call {
71 call_handle,
72 rtp_handle,
73 remote_uri,
74 call_channel: call_channel_local,
75 media_channel: media_channel_local,
76 })
77 }
78
79 pub async fn block_for_finished(&mut self) {
81 loop {
82 match self.call_channel.recv().await {
83 None => (),
84 Some(control) => {
85 if control == CallControl::Finished {
86 return;
87 }
88 }
89 }
90 }
91 }
92
93 pub async fn block_for_output_empty(&mut self) {
98 loop {
99 tokio::select! {
100 call_message = self.call_channel.receiver.recv() => {
101 if let Some(control) = call_message {
102 if control == CallControl::Finished {
103 return;
104 }
105 }
106 return;
107 }
108 media = self.media_channel.receiver.recv() => {
109 if let Some(media) = media {
110 if let Media::OutputEmpty = media {
111 return;
112 }
113 }
114 }
115 }
116 }
117 }
118
119 pub fn send_audio(&self, audio: Vec<f32>) -> Result<()>
128 {
129 self.media_channel.sender.send(Media::Audio(audio)).context("Failed to send audio to call. Call might be over.")
130 }
131
132 pub fn hangup(&self) -> Result<()>
134 {
135 self.call_channel.sender.send(CallControl::Hangup).context("Failed to send hangup to call. Call might be over.")
136 }
137
138 pub async fn recv(&mut self) -> Option<CallControl>
140 {
141 self.call_channel.receiver.recv().await
142 }
143
144 pub async fn recv_media(&mut self) -> Option<Media> {
146 self.media_channel.receiver.recv().await
147 }
148
149 pub async fn recv_either(&mut self) -> Either<Option<CallControl>, Option<Media>> {
151 tokio::select! {
152 message = self.call_channel.receiver.recv() => {
153 Either::Left(message)
154 }
155 media = self.media_channel.receiver.recv() => {
156 Either::Right(media)
157 }
158 }
159
160 }
161
162 pub fn get_remote_uri(&self) -> &String
164 {
165 &self.remote_uri.auth.as_ref().unwrap().user
166 }
167
168 pub fn is_finished(&self) -> bool {
172 self.call_handle.is_finished() || self.rtp_handle.is_finished() || self.call_channel.one_sided() || self.media_channel.one_sided()
173 }
174}
175
176impl Drop for Call {
177 fn drop(&mut self) {
178 if !self.call_handle.is_finished() {
179 self.call_handle.abort();
180 }
181 if !self.rtp_handle.is_finished() {
182 self.rtp_handle.abort();
183 }
184 }
185}