1use crate::code::{Code, ParseCode};
2use crate::error::EslError;
3use crate::esl::EslConnectionType;
4use crate::event::Event;
5use crate::io::EslCodec;
6use futures::SinkExt;
7use serde_json::Value;
8use std::collections::{HashMap, VecDeque};
9use std::sync::atomic::Ordering;
10use std::sync::{atomic::AtomicBool, Arc};
11use tokio::io::WriteHalf;
12use tokio::net::{TcpStream, ToSocketAddrs};
13use tokio::sync::{
14 oneshot::{channel, Sender},
15 Mutex,
16};
17use tokio_stream::StreamExt;
18use tokio_util::codec::{FramedRead, FramedWrite};
19use tracing::trace;
20#[derive(Debug)]
21pub struct EslConnection {
23 password: String,
24 commands: Arc<Mutex<VecDeque<Sender<Event>>>>,
25 transport_tx: Arc<Mutex<FramedWrite<WriteHalf<TcpStream>, EslCodec>>>,
26 background_jobs: Arc<Mutex<HashMap<String, Sender<Event>>>>,
27 connected: AtomicBool,
28 pub(crate) call_uuid: Option<String>,
29 connection_info: Option<HashMap<String, Value>>,
30}
31
32impl EslConnection {
33 pub async fn call_uuid(&self) -> Option<String> {
35 self.call_uuid.clone()
36 }
37 pub async fn disconnect(self) -> Result<(), EslError> {
39 self.send_recv(b"exit").await?;
40 self.connected.store(false, Ordering::Relaxed);
41 Ok(())
42 }
43 pub fn connected(&self) -> bool {
45 self.connected.load(Ordering::Relaxed)
46 }
47 pub(crate) async fn send(&self, item: &[u8]) -> Result<(), EslError> {
48 let mut transport = self.transport_tx.lock().await;
49 transport.send(item).await
50 }
51 pub async fn send_recv(&self, item: &[u8]) -> Result<Event, EslError> {
53 self.send(item).await?;
54 let (tx, rx) = channel();
55 self.commands.lock().await.push_back(tx);
56 Ok(rx.await?)
57 }
58
59 pub(crate) async fn with_tcpstream(
60 stream: TcpStream,
61 password: impl ToString,
62 connection_type: EslConnectionType,
63 ) -> Result<Self, EslError> {
64 let commands = Arc::new(Mutex::new(VecDeque::new()));
66 let inner_commands = Arc::clone(&commands);
67 let background_jobs = Arc::new(Mutex::new(HashMap::new()));
68 let inner_background_jobs = Arc::clone(&background_jobs);
69 let esl_codec = EslCodec {};
70 let (read_half, write_half) = tokio::io::split(stream);
71 let mut transport_rx = FramedRead::new(read_half, esl_codec.clone());
72 let transport_tx = Arc::new(Mutex::new(FramedWrite::new(write_half, esl_codec.clone())));
73 if connection_type == EslConnectionType::Inbound {
74 transport_rx.next().await;
75 }
76 let mut connection = Self {
77 password: password.to_string(),
78 commands,
79 background_jobs,
80 transport_tx,
81 connected: AtomicBool::new(false),
82 call_uuid: None,
83 connection_info: None,
84 };
85 tokio::spawn(async move {
86 loop {
87 if let Some(Ok(event)) = transport_rx.next().await {
88 if let Some(event_type) = event.headers.get("Content-Type") {
89 match event_type.as_str().unwrap() {
90 "text/disconnect-notice" => {
91 trace!("got disconnect notice");
92 return;
93 }
94 "text/event-json" => {
95 trace!("got event-json");
96 let data = event
97 .body()
98 .clone()
99 .expect("Unable to get body of event-json");
100
101 let event_body = parse_json_body(&data)
102 .expect("Unable to parse body of event-json");
103 let job_uuid = event_body.get("Job-UUID");
104 if let Some(job_uuid) = job_uuid {
105 let job_uuid = job_uuid.as_str().unwrap();
106 if let Some(tx) =
107 inner_background_jobs.lock().await.remove(job_uuid)
108 {
109 tx.send(event)
110 .expect("Unable to send channel message from bgapi");
111 }
112 trace!("continued");
113 continue;
114 }
115 if let Some(application_uuid) = event_body.get("Application-UUID") {
116 let job_uuid = application_uuid.as_str().unwrap();
117 if let Some(event_name) = event_body.get("Event-Name") {
118 if let Some(event_name) = event_name.as_str() {
119 if event_name == "CHANNEL_EXECUTE_COMPLETE" {
120 if let Some(tx) = inner_background_jobs
121 .lock()
122 .await
123 .remove(job_uuid)
124 {
125 tx.send(event).expect(
126 "Unable to send channel message from bgapi",
127 );
128 }
129 trace!("continued");
130 trace!("got channel execute complete");
131 }
132 }
133 }
134 }
135 continue;
136 }
137 _ => {
138 trace!("got another event {:?}", event);
139 }
140 }
141 }
142 if let Some(tx) = inner_commands.lock().await.pop_front() {
143 tx.send(event).expect("msg");
144 }
145 }
146 }
147 });
148 match connection_type {
149 EslConnectionType::Inbound => {
150 let auth_response = connection.auth().await?;
151 trace!("auth_response {:?}", auth_response);
152 connection
153 .subscribe(vec!["BACKGROUND_JOB", "CHANNEL_EXECUTE_COMPLETE"])
154 .await?;
155 }
156 EslConnectionType::Outbound => {
157 let response = connection.send_recv(b"connect").await?;
158 trace!("{:?}", response);
159 connection.connection_info = Some(response.headers().clone());
160 let response = connection
161 .subscribe(vec!["BACKGROUND_JOB", "CHANNEL_EXECUTE_COMPLETE"])
162 .await?;
163 trace!("{:?}", response);
164 let response = connection.send_recv(b"myevents").await?;
165 trace!("{:?}", response);
166 let connection_info = connection.connection_info.as_ref().unwrap();
167
168 let channel_unique_id = connection_info
169 .get("Channel-Unique-ID")
170 .unwrap()
171 .as_str()
172 .unwrap();
173 connection.call_uuid = Some(channel_unique_id.to_string());
174 }
175 }
176 Ok(connection)
177 }
178
179 pub async fn subscribe(&self, events: Vec<&str>) -> Result<Event, EslError> {
181 let message = format!("event json {}", events.join(" "));
182 self.send_recv(message.as_bytes()).await
183 }
184
185 pub(crate) async fn new(
186 socket: impl ToSocketAddrs,
187 password: impl ToString,
188 connection_type: EslConnectionType,
189 ) -> Result<Self, EslError> {
190 let stream = TcpStream::connect(socket).await?;
191 Self::with_tcpstream(stream, password, connection_type).await
192 }
193 pub(crate) async fn auth(&self) -> Result<String, EslError> {
194 let auth_response = self
195 .send_recv(format!("auth {}", self.password).as_bytes())
196 .await?;
197 let auth_headers = auth_response.headers();
198 let reply_text = auth_headers.get("Reply-Text").ok_or_else(|| {
199 EslError::InternalError("Reply-Text in auth request was not found".into())
200 })?;
201 let reply_text = reply_text.as_str().unwrap();
202 let (code, text) = parse_api_response(reply_text)?;
203 match code {
204 Code::Ok => {
205 self.connected.store(true, Ordering::Relaxed);
206 Ok(text)
207 }
208 Code::Err => Err(EslError::AuthFailed),
209 Code::Unknown => Err(EslError::InternalError(
210 "Got unknown code in auth request".into(),
211 )),
212 }
213 }
214
215 pub async fn hangup(&self, reason: &str) -> Result<Event, EslError> {
217 self.execute("hangup", reason).await
218 }
219
220 pub async fn execute(&self, app_name: &str, app_args: &str) -> Result<Event, EslError> {
222 let event_uuid = uuid::Uuid::new_v4().to_string();
223 let (tx, rx) = channel();
224 self.background_jobs
225 .lock()
226 .await
227 .insert(event_uuid.clone(), tx);
228 let call_uuid = self.call_uuid.as_ref().unwrap().clone();
229 let command = format!("sendmsg {}\nexecute-app-name: {}\nexecute-app-arg: {}\ncall-command: execute\nEvent-UUID: {}",call_uuid,app_name,app_args,event_uuid);
230 let response = self.send_recv(command.as_bytes()).await?;
231 trace!("inside execute {:?}", response);
232 let resp = rx.await?;
233 trace!("got response from channel {:?}", resp);
234 Ok(resp)
235 }
236
237 pub async fn answer(&self) -> Result<Event, EslError> {
239 self.execute("answer", "").await
240 }
241
242 pub async fn api(&self, command: &str) -> Result<String, EslError> {
244 let response = self.send_recv(format!("api {}", command).as_bytes()).await;
245 let event = response?;
246 let body = event
247 .body
248 .ok_or_else(|| EslError::InternalError("Didnt get body in api response".into()))?;
249
250 let (code, text) = parse_api_response(&body)?;
251 match code {
252 Code::Ok => Ok(text),
253 Code::Err => Err(EslError::ApiError(text)),
254 Code::Unknown => Ok(body),
255 }
256 }
257
258 pub async fn bgapi(&self, command: &str) -> Result<String, EslError> {
260 trace!("Send bgapi {}", command);
261 let job_uuid = uuid::Uuid::new_v4().to_string();
262 let (tx, rx) = channel();
263 self.background_jobs
264 .lock()
265 .await
266 .insert(job_uuid.clone(), tx);
267
268 self.send_recv(format!("bgapi {}\nJob-UUID: {}", command, job_uuid).as_bytes())
269 .await?;
270
271 let resp = rx.await?;
272 let body = resp
273 .body()
274 .clone()
275 .ok_or_else(|| EslError::InternalError("body was not found in event/json".into()))?;
276
277 let body_hashmap = parse_json_body(&body)?;
278
279 let mut hsmp = resp.headers().clone();
280 hsmp.extend(body_hashmap);
281 let body = hsmp
282 .get("_body")
283 .ok_or_else(|| EslError::InternalError("body was not found in event/json".into()))?;
284 let body = body.as_str().unwrap();
285 let (code, text) = parse_api_response(body)?;
286 match code {
287 Code::Ok => Ok(text),
288 Code::Err => Err(EslError::ApiError(text)),
289 Code::Unknown => Ok(body.to_string()),
290 }
291 }
292}
293fn parse_api_response(body: &str) -> Result<(Code, String), EslError> {
294 let space_index = body
295 .find(char::is_whitespace)
296 .ok_or_else(|| EslError::InternalError("Unable to find space index".into()))?;
297 let code = &body[..space_index];
298 let text_start = space_index + 1;
299 let body_length = body.len();
300 let text = if text_start < body_length {
301 body[text_start..(body_length - 1)].to_string()
302 } else {
303 String::new()
304 };
305 let code = code.parse_code()?;
306 Ok((code, text))
307}
308fn parse_json_body(body: &str) -> Result<HashMap<String, Value>, EslError> {
309 Ok(serde_json::from_str(body)?)
310}