freeswitch_esl_rs/
blocking_client.rs1extern crate queues;
2
3use std::{io, fmt, error};
4use std::io::{Read, Write, BufReader};
5use queues::{CircularBuffer, Queue, IsQueue};
6use crate::data::*;
7
8const EVENT_QUEUE_SIZE: usize = 100_000;
9
10pub trait Connectioner: Write + Read {
11}
12
13pub struct Connection<C: Connectioner> {
14 reader: BufReader<C>
15}
16
17impl<C: Connectioner> Connection<C> {
18 pub fn new(connection: C) -> Self {
19 let reader = BufReader::new(connection);
20
21 Self {
22 reader: reader
23 }
24 }
25
26 fn reader(&mut self) -> &mut BufReader<impl Read> {
27 &mut self.reader
28 }
29
30 fn writer(&mut self) -> &mut impl Write {
31 self.reader.get_mut()
32 }
33}
34
35#[derive(Debug)]
36pub enum ClientError {
37 ConnectionClose,
38 IOError(io::Error),
39 ParseError(ParseError)
40}
41
42impl fmt::Display for ClientError {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "{:?}", self)
45 }
46}
47
48impl error::Error for ClientError {
49 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
50 match self {
51 ClientError::ParseError(e) => Some(e),
52 ClientError::IOError(e) => Some(e),
53 ClientError::ConnectionClose => Some(self)
54 }
55 }
56}
57
58impl From<ParseError> for ClientError {
59 fn from(e: ParseError) -> Self {
60 ClientError::ParseError(e)
61 }
62}
63
64impl From<io::Error> for ClientError {
65 fn from(e: io::Error) -> Self {
66 ClientError::IOError(e)
67 }
68}
69
70pub struct Client<C: Connectioner> {
72 connection: Connection<C>,
73 api_response: Queue<Pdu>,
74 command_reply: Queue<Pdu>,
75 events: CircularBuffer<Pdu>
76}
77
78impl<C: Connectioner> Client<C> {
79 pub fn new(connection: Connection<C>) -> Self {
80 Self{
81 connection: connection,
82 api_response: Queue::new(),
83 command_reply: Queue::new(),
84 events: CircularBuffer::new(EVENT_QUEUE_SIZE)
85 }
86 }
87
88 pub fn pull_event(&mut self) -> Result<Event, ClientError> {
89 loop {
90 self.pull_and_process_pdu()?;
91
92 if let Ok(pdu) = self.events.remove() {
93 let event: Event = pdu.parse()?;
94 return Ok(event);
95 }
96 }
97 }
98
99 pub fn event(&mut self, event: &str) -> Result<(), ClientError> {
100 self.send_command(format_args!("event plain {}", event))?;
101
102 self.wait_for_command_reply()?;
103
104 Ok(())
105 }
106
107 pub fn api(&mut self, cmd: &str, arg: &str) -> Result<String, ClientError> {
108 self.send_command(format_args!("api {} {}", cmd, arg))?;
109
110 let pdu = self.wait_for_api_response()?;
111 let response: String = pdu.parse()?;
112 Ok(response)
113 }
114
115 pub fn auth(&mut self, pass: &str) -> Result<(), &'static str> {
116 let pdu = PduParser::parse(self.connection.reader()).unwrap();
117
118 if pdu.header("Content-Type") == "auth/request" {
119 self.send_command(format_args!("auth {}", pass)).unwrap();
120
121 let pdu = self.wait_for_command_reply().unwrap();
122
123 if pdu.header("Reply-Text") == "+OK accepted" {
124 Ok(())
125 } else {
126 Err("fails to authenticate")
127 }
128 } else {
129 Err("fails to authenticate")
130 }
131 }
132
133 fn wait_for_api_response(&mut self) -> Result<Pdu, ClientError> {
134 loop {
135 self.pull_and_process_pdu()?;
136
137 if let Ok(pdu) = self.api_response.remove() {
138 return Ok(pdu)
139 }
140 }
141 }
142
143 fn wait_for_command_reply(&mut self) -> Result<Pdu, ClientError> {
144 loop {
145 self.pull_and_process_pdu()?;
146
147 if let Ok(pdu) = self.command_reply.remove() {
148 return Ok(pdu)
149 }
150 }
151 }
152
153 fn send_command(&mut self, cmd: std::fmt::Arguments) -> io::Result<()> {
154 write!(self.connection.writer(), "{}\n\n", cmd)?;
155 self.connection.writer().flush()?;
156 Ok(())
157 }
158
159 fn pull_and_process_pdu(&mut self) -> Result<(), ClientError> {
160 let pdu = PduParser::parse(self.connection.reader()).expect("fails to read pdu");
161 let content_type = pdu.header("Content-Type");
162
163 if content_type == "api/response" {
164 self.api_response.add(pdu)
165 .expect("fails to add to api response");
166 Ok(())
167 } else if content_type == "text/disconnect-notice" {
168 Err(ClientError::ConnectionClose)
169 } else if content_type == "text/event-plain" {
170 self.events.add(pdu)
171 .expect("fails to add event");
172
173 Ok(())
174 } else if content_type == "command/reply" {
175 self.command_reply.add(pdu)
176 .expect("fails to add pdu to command_reply");
177 Ok(())
178 } else if pdu.is_empty() {
179 Err(ClientError::ConnectionClose)
180 } else {
181 panic!("missing handler for {:?}", pdu);
182 }
183 }
184}
185
186impl Connectioner for std::net::TcpStream {
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 impl Connectioner for std::io::Cursor<Vec<u8>> {
194 }
195
196 #[test]
197 fn it_authenticate() -> Result<(), &'static str> {
198 use std::io::Cursor;
199 let mut protocol = Cursor::new(vec![0; 512]);
200 write!(protocol, "Content-Type: auth/request\n\n").unwrap();
201 write!(protocol, "Content-Type: command/reply\nReply-Text: +OK accepted\n\n").unwrap();
202 protocol.set_position(0);
203
204 let conn = Connection::new(protocol);
205 let mut client = Client::new(conn);
206
207 client.auth("test")?;
208 Ok(())
209 }
210
211 #[test]
212 fn it_invalid_authentication() {
213 use std::io::Cursor;
214 let mut protocol = Cursor::new(vec![0; 512]);
215 write!(protocol, "Content-Type: auth/request\n\n").unwrap();
216 write!(protocol, "Content-Type: command/reply\nReply-Text: -ERR invalid\n\n").unwrap();
217 protocol.set_position(0);
218 let conn = Connection::new(protocol);
219 let mut client = Client::new(conn);
220
221 assert_eq!("fails to authenticate", client.auth("test").unwrap_err());
222 }
223
224 #[test]
225 fn it_call_api() -> Result<(), ClientError> {
226 use std::io::Cursor;
227 let mut protocol = Cursor::new(vec![0; 512]);
228 write!(protocol, "api uptime \n\n").unwrap();
229 write!(protocol,
230 concat!(
231 "Content-Type: api/response\n",
232 "Content-Length: 6\n\n",
233 "999666"
234 )
235 ).unwrap();
236 protocol.set_position(0);
237 let conn = Connection::new(protocol);
238 let mut client = Client::new(conn);
239
240 let pdu = client.api("uptime", "")?;
241
242 let response: String = pdu.parse().unwrap();
243 assert_eq!("999666", response);
244
245 Ok(())
246 }
247
248 #[test]
249 fn it_pull_event() -> Result<(), ClientError> {
250 use std::io::Cursor;
251 let mut protocol = Cursor::new(vec![0; 512]);
252 write!(protocol, "Content-Length: 526
253Content-Type: text/event-plain
254
255Event-Name: API
256Core-UUID: 2379c0b2-d1a9-465b-bdc6-ca55275e591b
257FreeSWITCH-Hostname: dafa872b4e1a
258FreeSWITCH-Switchname: 16.20.0.9
259FreeSWITCH-IPv4: 16.20.0.9
260FreeSWITCH-IPv6: %3A%3A1
261Event-Date-Local: 2022-10-15%2015%3A32%3A56
262Event-Date-GMT: Sat,%2015%20Oct%202022%2015%3A32%3A56%20GMT
263Event-Date-Timestamp: 1665847976799920
264Event-Calling-File: switch_loadable_module.c
265Event-Calling-Function: switch_api_execute
266Event-Calling-Line-Number: 2949
267Event-Sequence: 5578
268API-Command: show
269API-Command-Argument: calls%20as%20json
270
271").unwrap();
272 protocol.set_position(0);
273
274 let conn = Connection::new(protocol);
275 let mut client = Client::new(conn);
276
277 let event: Event = client.pull_event().unwrap();
278
279 assert_eq!("API", event.get("Event-Name").unwrap());
280 assert_eq!("show", event.get("API-Command").unwrap());
281
282 Ok(())
283 }
284
285 #[test]
286 fn it_pull_event_with_urldecoded_values() -> Result<(), ClientError> {
287 use std::io::Cursor;
288 let mut protocol = Cursor::new(vec![0; 512]);
289 write!(protocol, "Content-Length: 526
290Content-Type: text/event-plain
291
292Event-Name: API
293Core-UUID: 2379c0b2-d1a9-465b-bdc6-ca55275e591b
294FreeSWITCH-Hostname: dafa872b4e1a
295FreeSWITCH-Switchname: 16.20.0.9
296FreeSWITCH-IPv4: 16.20.0.9
297FreeSWITCH-IPv6: %3A%3A1
298Event-Date-Local: 2022-10-15%2015%3A32%3A56
299Event-Date-GMT: Sat,%2015%20Oct%202022%2015%3A32%3A56%20GMT
300Event-Date-Timestamp: 1665847976799920
301Event-Calling-File: switch_loadable_module.c
302Event-Calling-Function: switch_api_execute
303Event-Calling-Line-Number: 2949
304Event-Sequence: 5578
305API-Command: show
306API-Command-Argument: calls%20as%20json
307
308").unwrap();
309 protocol.set_position(0);
310
311 let conn = Connection::new(protocol);
312 let mut client = Client::new(conn);
313
314 let event: Event = client.pull_event()?;
315
316 assert_eq!("calls as json", event.get("API-Command-Argument").unwrap());
317
318 Ok(())
319 }
320
321 #[test]
322 fn it_handle_event_disconnection_from_server() {
323 use std::io::Cursor;
324 let mut protocol = Cursor::new(vec![0; 512]);
325 write!(protocol, "Content-Type: text/disconnect-notice
326Content-Length: 67
327
328Disconnected, goodbye.
329See you at ClueCon! http://www.cluecon.com/").unwrap();
330 protocol.set_position(0);
331
332 let conn = Connection::new(protocol);
333 let mut client = Client::new(conn);
334
335 let event = client.pull_event();
336 assert_eq!(true, event.is_err());
337 }
338
339 #[test]
340 fn it_handle_socket_disconnection_from_server() {
341 use std::io::Cursor;
342 let mut protocol = Cursor::new(vec![0; 512]);
343 write!(protocol, "").unwrap();
344 protocol.set_position(0);
345
346 let conn = Connection::new(protocol);
347 let mut client = Client::new(conn);
348
349 let event = client.pull_event();
350 assert_eq!(true, event.is_err());
351 }
352}