freeswitch_esl_rs/
blocking_client.rs

1extern crate queues;
2
3use std::{io, fmt, error};
4use std::io::{Read, Write, BufReader};
5use queues::{CircularBuffer, Queue, IsQueue};
6use crate::data::*;
7
8const EVENT_QUEUE_SIZE: usize = 100_000;
9
10pub trait Connectioner: Write + Read {
11}
12
13pub struct Connection<C: Connectioner> {
14    reader: BufReader<C>
15}
16
17impl<C: Connectioner> Connection<C> {
18    pub fn new(connection: C) -> Self {
19        let reader = BufReader::new(connection);
20        
21        Self {
22            reader: reader
23        }
24    }
25
26    fn reader(&mut self) -> &mut BufReader<impl Read> {
27        &mut self.reader
28    }
29
30    fn writer(&mut self) -> &mut impl Write {
31        self.reader.get_mut()
32    }
33}
34
35#[derive(Debug)]
36pub enum ClientError {
37    ConnectionClose,
38    IOError(io::Error),
39    ParseError(ParseError)
40}
41
42impl fmt::Display for ClientError {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "{:?}", self)
45    }
46}
47
48impl error::Error for ClientError {
49    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
50        match self {
51            ClientError::ParseError(e) => Some(e),
52            ClientError::IOError(e) => Some(e),
53            ClientError::ConnectionClose => Some(self)
54        }
55    }
56}
57
58impl From<ParseError> for ClientError {
59    fn from(e: ParseError) -> Self {
60        ClientError::ParseError(e)
61    }
62}
63
64impl From<io::Error> for ClientError {
65    fn from(e: io::Error) -> Self {
66        ClientError::IOError(e)
67    }
68}
69
70// Implement protocol Freeswitch mod_event_socket
71pub struct Client<C: Connectioner> {
72    connection: Connection<C>,
73    api_response: Queue<Pdu>,
74    command_reply: Queue<Pdu>,
75    events: CircularBuffer<Pdu>
76}
77
78impl<C: Connectioner> Client<C> {
79    pub fn new(connection: Connection<C>) -> Self {
80        Self{
81            connection: connection,
82            api_response: Queue::new(),
83            command_reply: Queue::new(),
84            events: CircularBuffer::new(EVENT_QUEUE_SIZE)
85        }
86    }
87
88    pub fn pull_event(&mut self) -> Result<Event, ClientError> {
89        loop {
90            self.pull_and_process_pdu()?;
91
92            if let Ok(pdu) = self.events.remove() {
93                let event: Event = pdu.parse()?;
94                return Ok(event);
95            }
96        }
97    }
98
99    pub fn event(&mut self, event: &str) -> Result<(), ClientError> {
100        self.send_command(format_args!("event plain {}", event))?;
101
102        self.wait_for_command_reply()?;
103        
104        Ok(())
105    }
106    
107    pub fn api(&mut self, cmd: &str, arg: &str) -> Result<String, ClientError> {
108        self.send_command(format_args!("api {} {}", cmd, arg))?;
109
110        let pdu = self.wait_for_api_response()?;
111        let response: String = pdu.parse()?;
112        Ok(response)
113    }
114
115    pub fn auth(&mut self, pass: &str) -> Result<(), &'static str> {
116        let pdu = PduParser::parse(self.connection.reader()).unwrap();
117        
118        if pdu.header("Content-Type") == "auth/request" {
119            self.send_command(format_args!("auth {}", pass)).unwrap();
120
121            let pdu = self.wait_for_command_reply().unwrap();
122
123            if pdu.header("Reply-Text") == "+OK accepted" {
124                Ok(())
125            } else {
126                Err("fails to authenticate")
127            }
128        } else {
129            Err("fails to authenticate")
130        }
131    }
132
133    fn wait_for_api_response(&mut self) -> Result<Pdu, ClientError> {
134        loop {
135            self.pull_and_process_pdu()?;
136
137            if let Ok(pdu) = self.api_response.remove() {
138                return Ok(pdu)
139            }
140        }
141    }
142
143    fn wait_for_command_reply(&mut self) -> Result<Pdu, ClientError> {
144        loop {
145            self.pull_and_process_pdu()?;
146
147            if let Ok(pdu) = self.command_reply.remove() {
148                return Ok(pdu)
149            }
150        }
151    }
152
153    fn send_command(&mut self, cmd: std::fmt::Arguments) -> io::Result<()> {
154        write!(self.connection.writer(), "{}\n\n", cmd)?;
155        self.connection.writer().flush()?;
156        Ok(())
157    }
158
159    fn pull_and_process_pdu(&mut self) -> Result<(), ClientError> {
160        let pdu = PduParser::parse(self.connection.reader()).expect("fails to read pdu");
161        let content_type = pdu.header("Content-Type");
162
163        if content_type == "api/response" {
164            self.api_response.add(pdu)
165                .expect("fails to add to api response");
166            Ok(())
167        } else if content_type == "text/disconnect-notice" {
168            Err(ClientError::ConnectionClose)
169        } else if content_type == "text/event-plain" {
170            self.events.add(pdu)
171                .expect("fails to add event");
172
173            Ok(())
174        } else if content_type == "command/reply" {
175            self.command_reply.add(pdu)
176                .expect("fails to add pdu to command_reply");
177            Ok(())
178        } else if pdu.is_empty() {
179            Err(ClientError::ConnectionClose)
180        } else {
181            panic!("missing handler for {:?}", pdu);
182        }
183    }
184}
185
186impl Connectioner for std::net::TcpStream {
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    impl Connectioner for std::io::Cursor<Vec<u8>> {
194    }
195    
196    #[test]
197    fn it_authenticate() -> Result<(), &'static str> {
198        use std::io::Cursor;
199        let mut protocol = Cursor::new(vec![0; 512]);
200        write!(protocol, "Content-Type: auth/request\n\n").unwrap();
201        write!(protocol, "Content-Type: command/reply\nReply-Text: +OK accepted\n\n").unwrap();
202        protocol.set_position(0);
203
204        let conn = Connection::new(protocol);
205        let mut client = Client::new(conn);
206
207        client.auth("test")?;
208        Ok(())
209    }
210    
211    #[test]
212    fn it_invalid_authentication() {
213        use std::io::Cursor;
214        let mut protocol = Cursor::new(vec![0; 512]);
215        write!(protocol, "Content-Type: auth/request\n\n").unwrap();
216        write!(protocol, "Content-Type: command/reply\nReply-Text: -ERR invalid\n\n").unwrap();
217        protocol.set_position(0);
218        let conn = Connection::new(protocol);
219        let mut client = Client::new(conn);
220
221        assert_eq!("fails to authenticate", client.auth("test").unwrap_err());
222    }
223
224    #[test]
225    fn it_call_api() -> Result<(), ClientError> {
226        use std::io::Cursor;
227        let mut protocol = Cursor::new(vec![0; 512]);
228        write!(protocol, "api uptime \n\n").unwrap();
229        write!(protocol,
230               concat!(
231                   "Content-Type: api/response\n",
232                   "Content-Length: 6\n\n",
233                   "999666"
234               )
235        ).unwrap();
236        protocol.set_position(0);
237        let conn = Connection::new(protocol);
238        let mut client = Client::new(conn);
239
240        let pdu = client.api("uptime", "")?;
241
242        let response: String = pdu.parse().unwrap();
243        assert_eq!("999666", response);
244
245        Ok(())
246    }
247
248    #[test]
249    fn it_pull_event() -> Result<(), ClientError> {
250        use std::io::Cursor;
251        let mut protocol = Cursor::new(vec![0; 512]);
252        write!(protocol, "Content-Length: 526
253Content-Type: text/event-plain
254
255Event-Name: API
256Core-UUID: 2379c0b2-d1a9-465b-bdc6-ca55275e591b
257FreeSWITCH-Hostname: dafa872b4e1a
258FreeSWITCH-Switchname: 16.20.0.9
259FreeSWITCH-IPv4: 16.20.0.9
260FreeSWITCH-IPv6: %3A%3A1
261Event-Date-Local: 2022-10-15%2015%3A32%3A56
262Event-Date-GMT: Sat,%2015%20Oct%202022%2015%3A32%3A56%20GMT
263Event-Date-Timestamp: 1665847976799920
264Event-Calling-File: switch_loadable_module.c
265Event-Calling-Function: switch_api_execute
266Event-Calling-Line-Number: 2949
267Event-Sequence: 5578
268API-Command: show
269API-Command-Argument: calls%20as%20json
270
271").unwrap();
272        protocol.set_position(0);
273
274        let conn = Connection::new(protocol);
275        let mut client = Client::new(conn);
276
277        let event: Event = client.pull_event().unwrap();
278
279        assert_eq!("API", event.get("Event-Name").unwrap());
280        assert_eq!("show", event.get("API-Command").unwrap());
281
282        Ok(())
283    }
284
285    #[test]
286    fn it_pull_event_with_urldecoded_values() -> Result<(), ClientError> {
287        use std::io::Cursor;
288        let mut protocol = Cursor::new(vec![0; 512]);
289        write!(protocol, "Content-Length: 526
290Content-Type: text/event-plain
291
292Event-Name: API
293Core-UUID: 2379c0b2-d1a9-465b-bdc6-ca55275e591b
294FreeSWITCH-Hostname: dafa872b4e1a
295FreeSWITCH-Switchname: 16.20.0.9
296FreeSWITCH-IPv4: 16.20.0.9
297FreeSWITCH-IPv6: %3A%3A1
298Event-Date-Local: 2022-10-15%2015%3A32%3A56
299Event-Date-GMT: Sat,%2015%20Oct%202022%2015%3A32%3A56%20GMT
300Event-Date-Timestamp: 1665847976799920
301Event-Calling-File: switch_loadable_module.c
302Event-Calling-Function: switch_api_execute
303Event-Calling-Line-Number: 2949
304Event-Sequence: 5578
305API-Command: show
306API-Command-Argument: calls%20as%20json
307
308").unwrap();
309        protocol.set_position(0);
310
311        let conn = Connection::new(protocol);
312        let mut client = Client::new(conn);
313
314        let event: Event = client.pull_event()?;
315
316        assert_eq!("calls as json", event.get("API-Command-Argument").unwrap());
317
318        Ok(())
319    }
320
321    #[test]
322    fn it_handle_event_disconnection_from_server() {
323        use std::io::Cursor;
324        let mut protocol = Cursor::new(vec![0; 512]);
325        write!(protocol, "Content-Type: text/disconnect-notice
326Content-Length: 67
327
328Disconnected, goodbye.
329See you at ClueCon! http://www.cluecon.com/").unwrap();
330        protocol.set_position(0);
331
332        let conn = Connection::new(protocol);
333        let mut client = Client::new(conn);
334
335        let event = client.pull_event();
336        assert_eq!(true, event.is_err());
337    }
338
339    #[test]
340    fn it_handle_socket_disconnection_from_server() {
341        use std::io::Cursor;
342        let mut protocol = Cursor::new(vec![0; 512]);
343        write!(protocol, "").unwrap();
344        protocol.set_position(0);
345
346        let conn = Connection::new(protocol);
347        let mut client = Client::new(conn);
348
349        let event = client.pull_event();
350        assert_eq!(true, event.is_err());
351    }
352}