freeswitch_esl/
connection.rs

1use crate::code::{Code, ParseCode};
2use crate::error::EslError;
3use crate::esl::EslConnectionType;
4use crate::event::Event;
5use crate::io::EslCodec;
6use futures::SinkExt;
7use serde_json::Value;
8use std::collections::{HashMap, VecDeque};
9use std::sync::atomic::Ordering;
10use std::sync::{atomic::AtomicBool, Arc};
11use tokio::io::WriteHalf;
12use tokio::net::{TcpStream, ToSocketAddrs};
13use tokio::sync::{
14    oneshot::{channel, Sender},
15    Mutex,
16};
17use tokio_stream::StreamExt;
18use tokio_util::codec::{FramedRead, FramedWrite};
19use tracing::trace;
20#[derive(Debug)]
21/// contains Esl connection with freeswitch
22pub struct EslConnection {
23    password: String,
24    commands: Arc<Mutex<VecDeque<Sender<Event>>>>,
25    transport_tx: Arc<Mutex<FramedWrite<WriteHalf<TcpStream>, EslCodec>>>,
26    background_jobs: Arc<Mutex<HashMap<String, Sender<Event>>>>,
27    connected: AtomicBool,
28    pub(crate) call_uuid: Option<String>,
29    connection_info: Option<HashMap<String, Value>>,
30}
31
32impl EslConnection {
33    /// returns call uuid in outbound mode
34    pub async fn call_uuid(&self) -> Option<String> {
35        self.call_uuid.clone()
36    }
37    /// disconnects from freeswitch
38    pub async fn disconnect(self) -> Result<(), EslError> {
39        self.send_recv(b"exit").await?;
40        self.connected.store(false, Ordering::Relaxed);
41        Ok(())
42    }
43    /// returns status of esl connection
44    pub fn connected(&self) -> bool {
45        self.connected.load(Ordering::Relaxed)
46    }
47    pub(crate) async fn send(&self, item: &[u8]) -> Result<(), EslError> {
48        let mut transport = self.transport_tx.lock().await;
49        transport.send(item).await
50    }
51    /// sends raw message to freeswitch and receives reply
52    pub async fn send_recv(&self, item: &[u8]) -> Result<Event, EslError> {
53        self.send(item).await?;
54        let (tx, rx) = channel();
55        self.commands.lock().await.push_back(tx);
56        Ok(rx.await?)
57    }
58
59    pub(crate) async fn with_tcpstream(
60        stream: TcpStream,
61        password: impl ToString,
62        connection_type: EslConnectionType,
63    ) -> Result<Self, EslError> {
64        // let sender = Arc::new(sender);
65        let commands = Arc::new(Mutex::new(VecDeque::new()));
66        let inner_commands = Arc::clone(&commands);
67        let background_jobs = Arc::new(Mutex::new(HashMap::new()));
68        let inner_background_jobs = Arc::clone(&background_jobs);
69        let esl_codec = EslCodec {};
70        let (read_half, write_half) = tokio::io::split(stream);
71        let mut transport_rx = FramedRead::new(read_half, esl_codec.clone());
72        let transport_tx = Arc::new(Mutex::new(FramedWrite::new(write_half, esl_codec.clone())));
73        if connection_type == EslConnectionType::Inbound {
74            transport_rx.next().await;
75        }
76        let mut connection = Self {
77            password: password.to_string(),
78            commands,
79            background_jobs,
80            transport_tx,
81            connected: AtomicBool::new(false),
82            call_uuid: None,
83            connection_info: None,
84        };
85        tokio::spawn(async move {
86            loop {
87                if let Some(Ok(event)) = transport_rx.next().await {
88                    if let Some(event_type) = event.headers.get("Content-Type") {
89                        match event_type.as_str().unwrap() {
90                            "text/disconnect-notice" => {
91                                trace!("got disconnect notice");
92                                return;
93                            }
94                            "text/event-json" => {
95                                trace!("got event-json");
96                                let data = event
97                                    .body()
98                                    .clone()
99                                    .expect("Unable to get body of event-json");
100
101                                let event_body = parse_json_body(&data)
102                                    .expect("Unable to parse body of event-json");
103                                let job_uuid = event_body.get("Job-UUID");
104                                if let Some(job_uuid) = job_uuid {
105                                    let job_uuid = job_uuid.as_str().unwrap();
106                                    if let Some(tx) =
107                                        inner_background_jobs.lock().await.remove(job_uuid)
108                                    {
109                                        tx.send(event)
110                                            .expect("Unable to send channel message from bgapi");
111                                    }
112                                    trace!("continued");
113                                    continue;
114                                }
115                                if let Some(application_uuid) = event_body.get("Application-UUID") {
116                                    let job_uuid = application_uuid.as_str().unwrap();
117                                    if let Some(event_name) = event_body.get("Event-Name") {
118                                        if let Some(event_name) = event_name.as_str() {
119                                            if event_name == "CHANNEL_EXECUTE_COMPLETE" {
120                                                if let Some(tx) = inner_background_jobs
121                                                    .lock()
122                                                    .await
123                                                    .remove(job_uuid)
124                                                {
125                                                    tx.send(event).expect(
126                                                        "Unable to send channel message from bgapi",
127                                                    );
128                                                }
129                                                trace!("continued");
130                                                trace!("got channel execute complete");
131                                            }
132                                        }
133                                    }
134                                }
135                                continue;
136                            }
137                            _ => {
138                                trace!("got another event {:?}", event);
139                            }
140                        }
141                    }
142                    if let Some(tx) = inner_commands.lock().await.pop_front() {
143                        tx.send(event).expect("msg");
144                    }
145                }
146            }
147        });
148        match connection_type {
149            EslConnectionType::Inbound => {
150                let auth_response = connection.auth().await?;
151                trace!("auth_response {:?}", auth_response);
152                connection
153                    .subscribe(vec!["BACKGROUND_JOB", "CHANNEL_EXECUTE_COMPLETE"])
154                    .await?;
155            }
156            EslConnectionType::Outbound => {
157                let response = connection.send_recv(b"connect").await?;
158                trace!("{:?}", response);
159                connection.connection_info = Some(response.headers().clone());
160                let response = connection
161                    .subscribe(vec!["BACKGROUND_JOB", "CHANNEL_EXECUTE_COMPLETE"])
162                    .await?;
163                trace!("{:?}", response);
164                let response = connection.send_recv(b"myevents").await?;
165                trace!("{:?}", response);
166                let connection_info = connection.connection_info.as_ref().unwrap();
167
168                let channel_unique_id = connection_info
169                    .get("Channel-Unique-ID")
170                    .unwrap()
171                    .as_str()
172                    .unwrap();
173                connection.call_uuid = Some(channel_unique_id.to_string());
174            }
175        }
176        Ok(connection)
177    }
178
179    /// subscribes to given events
180    pub async fn subscribe(&self, events: Vec<&str>) -> Result<Event, EslError> {
181        let message = format!("event json {}", events.join(" "));
182        self.send_recv(message.as_bytes()).await
183    }
184
185    pub(crate) async fn new(
186        socket: impl ToSocketAddrs,
187        password: impl ToString,
188        connection_type: EslConnectionType,
189    ) -> Result<Self, EslError> {
190        let stream = TcpStream::connect(socket).await?;
191        Self::with_tcpstream(stream, password, connection_type).await
192    }
193    pub(crate) async fn auth(&self) -> Result<String, EslError> {
194        let auth_response = self
195            .send_recv(format!("auth {}", self.password).as_bytes())
196            .await?;
197        let auth_headers = auth_response.headers();
198        let reply_text = auth_headers.get("Reply-Text").ok_or_else(|| {
199            EslError::InternalError("Reply-Text in auth request was not found".into())
200        })?;
201        let reply_text = reply_text.as_str().unwrap();
202        let (code, text) = parse_api_response(reply_text)?;
203        match code {
204            Code::Ok => {
205                self.connected.store(true, Ordering::Relaxed);
206                Ok(text)
207            }
208            Code::Err => Err(EslError::AuthFailed),
209            Code::Unknown => Err(EslError::InternalError(
210                "Got unknown code in auth request".into(),
211            )),
212        }
213    }
214
215    /// For hanging up call in outbound mode
216    pub async fn hangup(&self, reason: &str) -> Result<Event, EslError> {
217        self.execute("hangup", reason).await
218    }
219
220    /// executes application in freeswitch
221    pub async fn execute(&self, app_name: &str, app_args: &str) -> Result<Event, EslError> {
222        let event_uuid = uuid::Uuid::new_v4().to_string();
223        let (tx, rx) = channel();
224        self.background_jobs
225            .lock()
226            .await
227            .insert(event_uuid.clone(), tx);
228        let call_uuid = self.call_uuid.as_ref().unwrap().clone();
229        let command  = format!("sendmsg {}\nexecute-app-name: {}\nexecute-app-arg: {}\ncall-command: execute\nEvent-UUID: {}",call_uuid,app_name,app_args,event_uuid);
230        let response = self.send_recv(command.as_bytes()).await?;
231        trace!("inside execute {:?}", response);
232        let resp = rx.await?;
233        trace!("got response from channel {:?}", resp);
234        Ok(resp)
235    }
236
237    /// answers call in outbound mode
238    pub async fn answer(&self) -> Result<Event, EslError> {
239        self.execute("answer", "").await
240    }
241
242    /// sends api command to freeswitch
243    pub async fn api(&self, command: &str) -> Result<String, EslError> {
244        let response = self.send_recv(format!("api {}", command).as_bytes()).await;
245        let event = response?;
246        let body = event
247            .body
248            .ok_or_else(|| EslError::InternalError("Didnt get body in api response".into()))?;
249
250        let (code, text) = parse_api_response(&body)?;
251        match code {
252            Code::Ok => Ok(text),
253            Code::Err => Err(EslError::ApiError(text)),
254            Code::Unknown => Ok(body),
255        }
256    }
257
258    /// sends bgapi commands to freeswitch
259    pub async fn bgapi(&self, command: &str) -> Result<String, EslError> {
260        trace!("Send bgapi {}", command);
261        let job_uuid = uuid::Uuid::new_v4().to_string();
262        let (tx, rx) = channel();
263        self.background_jobs
264            .lock()
265            .await
266            .insert(job_uuid.clone(), tx);
267
268        self.send_recv(format!("bgapi {}\nJob-UUID: {}", command, job_uuid).as_bytes())
269            .await?;
270
271        let resp = rx.await?;
272        let body = resp
273            .body()
274            .clone()
275            .ok_or_else(|| EslError::InternalError("body was not found in event/json".into()))?;
276
277        let body_hashmap = parse_json_body(&body)?;
278
279        let mut hsmp = resp.headers().clone();
280        hsmp.extend(body_hashmap);
281        let body = hsmp
282            .get("_body")
283            .ok_or_else(|| EslError::InternalError("body was not found in event/json".into()))?;
284        let body = body.as_str().unwrap();
285        let (code, text) = parse_api_response(body)?;
286        match code {
287            Code::Ok => Ok(text),
288            Code::Err => Err(EslError::ApiError(text)),
289            Code::Unknown => Ok(body.to_string()),
290        }
291    }
292}
293fn parse_api_response(body: &str) -> Result<(Code, String), EslError> {
294    let space_index = body
295        .find(char::is_whitespace)
296        .ok_or_else(|| EslError::InternalError("Unable to find space index".into()))?;
297    let code = &body[..space_index];
298    let text_start = space_index + 1;
299    let body_length = body.len();
300    let text = if text_start < body_length {
301        body[text_start..(body_length - 1)].to_string()
302    } else {
303        String::new()
304    };
305    let code = code.parse_code()?;
306    Ok((code, text))
307}
308fn parse_json_body(body: &str) -> Result<HashMap<String, Value>, EslError> {
309    Ok(serde_json::from_str(body)?)
310}