asterisk_manager/lib.rs
1//! # Asterisk Manager Library
2//!
3//! This library provides an implementation to manage connections and authentication with an Asterisk Call Manager (AMI) server.
4//!
5//! It offers a robust and efficient way to interact with the AMI server, handling connection retries, authentication, and event processing seamlessly. The library is built using Tokio for asynchronous operations, ensuring high performance and scalability. Additionally, it processes and formats data into JSON objects for easy manipulation and integration.
6//!
7//! ## Features
8//!
9//! - **Reconnection Logic**: Automatically handles reconnections to the AMI server in case of connection drops.
10//! - **Event Handling**: Processes various AMI events and provides a mechanism to handle them.
11//! - **Asynchronous Operations**: Utilizes Tokio for non-blocking, asynchronous operations.
12//! - **Participant Management**: Manages call participants and their states efficiently.
13//! - **JSON Data Handling**: Formats and processes data into JSON objects for easy manipulation.
14//! - **Event Callbacks**: Allows registration of callbacks for specific events, all events, raw events, and response events.
15//!
16//! ## Usage Example
17//!
18//! ```rust,no_run
19//! use asterisk_manager::{ManagerBuilder, ManagerOptions};
20//! use tokio::runtime::Runtime;
21//! use serde_json::Value;
22//!
23//! fn main() {
24//! let rt = Runtime::new().unwrap();
25//! rt.block_on(async {
26//! let options = ManagerOptions {
27//! port: 5038,
28//! host: "example.com".to_string(),
29//! username: "admin".to_string(),
30//! password: "password".to_string(),
31//! events: false,
32//! };
33//!
34//! let callback1 = Box::new(|event: Value| {
35//! println!("Callback1 received event: {}", event);
36//! });
37//!
38//! let callback2 = Box::new(|event: Value| {
39//! println!("Callback2 received event: {}", event);
40//! });
41//!
42//! let global_callback = Box::new(|event: Value| {
43//! println!("Global callback received event: {}", event);
44//! });
45//!
46//! let raw_event_callback = Box::new(|event: Value| {
47//! println!("Raw event callback received event: {}", event);
48//! });
49//!
50//! let response_event_callback = Box::new(|event: Value| {
51//! println!("Response event callback received event: {}", event);
52//! });
53//!
54//! let mut manager = ManagerBuilder::new(options)
55//! .on_event("Newchannel", callback1)
56//! .on_event("Hangup", callback2)
57//! .on_all_events(global_callback)
58//! .on_all_raw_events(raw_event_callback)
59//! .on_all_response_events(response_event_callback)
60//! .build();
61//!
62//! manager.connect_with_retries().await;
63//!
64//! if !manager.is_authenticated() {
65//! println!("Authentication failed");
66//! return;
67//! }
68//!
69//! let action = serde_json::json!({
70//! "action": "QueueStatus",
71//! });
72//! if let Err(err) = manager.send_action(action).await {
73//! println!("Error sending action: {}", err);
74//! return;
75//! }
76//!
77//! manager.read_data_with_retries().await;
78//!
79//! manager.disconnect().await;
80//! });
81//! }
82//! ```
83//!
84//! ## Participant Structure
85//!
86//! The `Participant` structure represents a participant in a call.
87//!
88//! ```rust
89//! #[derive(Debug)]
90//! pub struct Participant {
91//! name: String,
92//! number: String,
93//! with: Option<String>,
94//! }
95//! ```
96//!
97//! ## Manager Structure
98//!
99//! The `Manager` structure handles connections and options for the AMI server.
100//!
101//! ```rust
102//! use asterisk_manager::ManagerOptions;
103//! use tokio::net::TcpStream;
104//! use std::collections::HashMap;
105//! use tokio::sync::mpsc;
106//! use asterisk_manager::Participant;
107//! use std::sync::{Arc, Mutex};
108//!
109//! pub struct Manager {
110//! options: ManagerOptions,
111//! connection: Option<TcpStream>,
112//! authenticated: bool,
113//! emitter: Arc<Mutex<Vec<String>>>,
114//! event_sender: Option<mpsc::Sender<String>>,
115//! participants: HashMap<String, Participant>,
116//! event_callbacks: HashMap<String, Box<dyn Fn(serde_json::Value) + Send + Sync>>,
117//! global_callback: Option<Box<dyn Fn(serde_json::Value) + Send + Sync>>,
118//! raw_event_callback: Option<Box<dyn Fn(serde_json::Value) + Send + Sync>>,
119//! response_event_callback: Option<Box<dyn Fn(serde_json::Value) + Send + Sync>>,
120//! }
121//! ```
122//!
123//! ## Main Methods
124//!
125//! ### `Manager::new`
126//! Creates a new instance of the Manager. The `event_sender` and `event_callback` parameters are optional.
127//!
128//! ### `Manager::connect`
129//! Connects to the AMI server.
130//!
131//! ### `Manager::connect_with_retries`
132//! Connects to the AMI server with reconnection logic.
133//!
134//! ### `Manager::authenticate`
135//! Authenticates with the AMI server.
136//!
137//! ### `Manager::send_action`
138//! Sends an action to the AMI server.
139//!
140//! ### `Manager::read_data`
141//! Reads data from the AMI server.
142//!
143//! ### `Manager::read_data_with_retries`
144//! Reads data from the AMI server with reconnection logic.
145//!
146//! ### `Manager::on_event`
147//! Processes an event received from the AMI server.
148//!
149//! ### `Manager::parse_event`
150//! Parses an event received from the AMI server.
151//!
152//! ### `Manager::disconnect`
153//! Disconnects from the AMI server.
154//!
155//! ### `Manager::is_authenticated`
156//! Returns whether the manager is authenticated.
157//!
158//! ## ManagerBuilder Structure
159//!
160//! The `ManagerBuilder` structure helps in building a `Manager` instance with various event callbacks.
161//!
162//! ### `ManagerBuilder::new`
163//! Creates a new instance of the `ManagerBuilder`.
164//!
165//! ### `ManagerBuilder::on_event`
166//! Registers a callback for a specific event.
167//!
168//! ### `ManagerBuilder::on_all_events`
169//! Registers a callback for all events.
170//!
171//! ### `ManagerBuilder::on_all_raw_events`
172//! Registers a callback for all raw events.
173//!
174//! ### `ManagerBuilder::on_all_response_events`
175//! Registers a callback for all response events.
176//!
177//! ### `ManagerBuilder::build`
178//! Builds and returns a `Manager` instance.
179
180use serde::{Deserialize, Serialize};
181use serde_json::Value;
182use std::collections::HashMap;
183use std::sync::{Arc, Mutex};
184use tokio::io::{AsyncReadExt, AsyncWriteExt};
185use tokio::net::TcpStream;
186use tokio::sync::mpsc;
187use tokio::time::{timeout, Duration};
188
189/// Structure that contains the Manager configuration options.
190/// Represents the configuration options for the manager.
191///
192/// # Fields
193///
194/// * `port` - The port to connect to Asterisk AMI.
195/// * `host` - The host to connect to Asterisk AMI.
196/// * `username` - The username to authenticate with.
197/// * `password` - The password to authenticate with.
198/// * `events` - Indicates whether events should be enabled.
199///
200/// # Example
201///
202/// ```rust
203/// use asterisk_manager::ManagerOptions;
204///
205/// let options = ManagerOptions {
206/// port: 5038,
207/// host: "example.com".to_string(),
208/// username: "admin".to_string(),
209/// password: "password".to_string(),
210/// events: false,
211/// };
212/// ```
213///
214/// # Example (JSON)
215///
216/// ```json
217/// {
218/// "port": 5038,
219/// "host": "example.com",
220/// "username": "admin",
221/// "password": "password",
222/// "events": false
223/// }
224/// ```
225///
226#[derive(Serialize, Deserialize, Debug)]
227pub struct ManagerOptions {
228 pub port: u16,
229 pub host: String,
230 pub username: String,
231 pub password: String,
232 pub events: bool,
233}
234
235/// Structure that represents the Manager.
236/// Represents a manager that handles connections and options.
237///
238/// # Fields
239///
240/// * `options` - Configuration options for the manager.
241/// * `connection` - An optional TCP stream representing the connection.
242/// * `authenticated` - Indicates whether the manager is authenticated. This field is currently unused.
243/// * `emitter` - A thread-safe vector of strings used for emitting events or messages.
244pub struct Manager {
245 options: ManagerOptions,
246 connection: Option<TcpStream>,
247 authenticated: bool,
248 emitter: Arc<Mutex<Vec<String>>>,
249 event_sender: Option<mpsc::Sender<String>>,
250 participants: HashMap<String, Participant>,
251 event_callbacks: HashMap<String, Box<dyn Fn(Value) + Send + Sync>>,
252 global_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
253 raw_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
254 response_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
255}
256
257pub struct ManagerBuilder {
258 options: ManagerOptions,
259 #[allow(dead_code)]
260 event_senders: HashMap<String, mpsc::Sender<String>>,
261 event_callbacks: HashMap<String, Box<dyn Fn(Value) + Send + Sync>>,
262 global_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
263 raw_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
264 response_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
265}
266
267/// A builder for creating a `Manager` instance with various event callbacks.
268///
269/// # Example
270///
271/// ```
272/// use asterisk_manager::ManagerBuilder;
273/// use asterisk_manager::ManagerOptions;
274///
275/// let options = ManagerOptions {
276/// port: 5038,
277/// host: "example.com".to_string(),
278/// username: "admin".to_string(),
279/// password: "password".to_string(),
280/// events: false,
281/// };
282///
283/// let manager = ManagerBuilder::new(options)
284/// .on_event("event_name", Box::new(|value| {
285/// // handle event
286/// }))
287/// .on_all_events(Box::new(|value| {
288/// // handle all events
289/// }))
290/// .on_all_raw_events(Box::new(|value| {
291/// // handle all raw events
292/// }))
293/// .on_all_response_events(Box::new(|value| {
294/// // handle all response events
295/// }))
296/// .build();
297/// ```
298///
299/// # Methods
300///
301/// - `new(options: ManagerOptions) -> Self`: Creates a new `ManagerBuilder` with the specified options.
302/// - `on_event(mut self, event: &str, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for a specific event.
303/// - `on_all_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for all events.
304/// - `on_all_raw_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for all raw events.
305/// - `on_all_response_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for all response events.
306/// - `build(self) -> Manager`: Consumes the builder and returns a `Manager` instance.
307impl ManagerBuilder {
308 /// Creates a new instance of the `ManagerBuilder`.
309 /// Initializes the builder with the specified options.
310 pub fn new(options: ManagerOptions) -> Self {
311 ManagerBuilder {
312 options,
313 event_senders: HashMap::new(),
314 event_callbacks: HashMap::new(),
315 global_callback: None,
316 raw_event_callback: None,
317 response_event_callback: None,
318 }
319 }
320
321 /// Registers a callback for a specific event.
322 pub fn on_event(mut self, event: &str, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
323 self.event_callbacks.insert(event.to_string(), callback);
324 self
325 }
326
327 /// Registers a callback for all events.
328 pub fn on_all_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
329 self.global_callback = Some(callback);
330 self
331 }
332
333 /// Registers a callback for all raw events.
334 pub fn on_all_raw_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
335 self.raw_event_callback = Some(callback);
336 self
337 }
338
339 /// Registers a callback for all response events by actions sended to AMI Asterisk.
340 pub fn on_all_response_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
341 self.response_event_callback = Some(callback);
342 self
343 }
344
345 /// Consumes the builder and returns a `Manager` instance.
346 pub fn build(self) -> Manager {
347 Manager {
348 options: self.options,
349 connection: None,
350 authenticated: false,
351 emitter: Arc::new(Mutex::new(vec![])),
352 event_sender: None,
353 participants: HashMap::new(),
354 event_callbacks: self.event_callbacks,
355 global_callback: self.global_callback,
356 raw_event_callback: self.raw_event_callback,
357 response_event_callback: self.response_event_callback,
358 }
359 }
360}
361
362#[derive(Debug)]
363#[allow(dead_code)]
364pub struct Participant {
365 name: String,
366 number: String,
367 with: Option<String>,
368}
369
370impl Manager {
371 /// Creates a new instance of the Manager.
372 pub fn new(
373 options: ManagerOptions,
374 event_sender: Option<mpsc::Sender<String>>,
375 _event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
376 ) -> Self {
377 Manager {
378 options,
379 connection: None,
380 authenticated: false,
381 emitter: Arc::new(Mutex::new(vec![])),
382 event_sender,
383 participants: HashMap::new(),
384 event_callbacks: HashMap::new(),
385 global_callback: None,
386 raw_event_callback: None,
387 response_event_callback: None,
388 }
389 }
390
391 /// Connects to the AMI server.
392 pub async fn connect(&mut self) -> Result<(), String> {
393 log::debug!("Connecting to {}:{}", self.options.host, self.options.port);
394 let connection = timeout(
395 Duration::from_secs(10),
396 TcpStream::connect((self.options.host.as_str(), self.options.port)),
397 )
398 .await
399 .map_err(|_| "Connection timed out".to_string())?
400 .map_err(|e| e.to_string())?;
401
402 self.connection = Some(connection);
403 if let Some(sender) = &self.event_sender {
404 sender.send("serverconnect".to_string()).await.unwrap();
405 }
406 self.authenticate().await
407 }
408
409 /// Connects to the AMI server with reconnection logic.
410 pub async fn connect_with_retries(&mut self) {
411 let mut attempts = 0;
412 loop {
413 match self.connect().await {
414 Ok(_) => {
415 log::info!("Successfully connected to AMI server");
416 break;
417 }
418 Err(err) => {
419 log::error!("Failed to connect: {}", err);
420 attempts += 1;
421 let wait_time = if attempts > 10 {
422 Duration::from_secs(60)
423 } else {
424 Duration::from_secs(5)
425 };
426 log::info!("Reconnecting in {:?}", wait_time);
427 tokio::time::sleep(wait_time).await;
428 }
429 }
430 }
431 }
432
433 /// Authenticates with the AMI server.
434 async fn authenticate(&mut self) -> Result<(), String> {
435 log::debug!("Authenticating with username: {}", self.options.username);
436 let login_action = serde_json::json!({
437 "Action": "Login",
438 "event": if self.options.events { "on" } else { "off" },
439 "secret": self.options.password,
440 "username": self.options.username
441 });
442
443 self.send_action(login_action).await?;
444
445 let response = self.read_data().await?;
446
447 if response.contains("Response: Success") {
448 self.authenticated = true;
449 Ok(())
450 } else {
451 Err("Authentication failed".to_string())
452 }
453 }
454
455 /// Sends an action to the AMI server.
456 pub async fn send_action(&mut self, action: serde_json::Value) -> Result<(), String> {
457 let mut action_str = String::new();
458 for (key, value) in action.as_object().unwrap().iter() {
459 action_str.push_str(&format!("{}: {}\r\n", key, value.as_str().unwrap_or("")));
460 }
461 action_str.push_str("\r\n");
462
463 let connection = self
464 .connection
465 .as_mut()
466 .ok_or("No connection established")?;
467 connection
468 .write_all(action_str.as_bytes())
469 .await
470 .map_err(|e| e.to_string())?;
471
472 Ok(())
473 }
474
475 /// Reads data from the AMI server.
476 pub async fn read_data(&mut self) -> Result<String, String> {
477 let mut buffer = vec![0; 8192];
478 let mut complete_data = String::new();
479 let mut event_list_started = false;
480 let mut is_response_event = false;
481
482 let connection = self
483 .connection
484 .as_mut()
485 .ok_or("No connection established")?;
486
487 loop {
488 let n = match connection.read(&mut buffer).await {
489 Ok(n) if n == 0 => {
490 return Err("Connection lost".to_string());
491 }
492 Ok(n) => n,
493 Err(e) => {
494 return Err(e.to_string());
495 }
496 };
497
498 let data = String::from_utf8_lossy(&buffer[..n]);
499 complete_data.push_str(&data);
500
501 if data.contains("Response:") {
502 is_response_event = true;
503 }
504
505 if data.contains("EventList: start") {
506 event_list_started = true;
507 }
508
509 if event_list_started && data.contains("EventList: Complete") {
510 break;
511 }
512
513 if !event_list_started && data.contains("\r\n\r\n") {
514 break;
515 }
516 }
517
518 if is_response_event {
519 self.on_event(complete_data.clone(), "responseEvent").await;
520 } else {
521 for event in complete_data.split("\r\n\r\n") {
522 if !event.trim().is_empty() {
523 self.on_event(event.to_string(), "rawEvent").await;
524 }
525 }
526 }
527
528 Ok(complete_data)
529 }
530
531 /// Reads data from the AMI server with reconnection logic.
532 pub async fn read_data_with_retries(&mut self) {
533 loop {
534 match self.read_data().await {
535 Ok(_) => {}
536 Err(err) => {
537 log::error!("Error reading data: {}", err);
538 self.connect_with_retries().await;
539 }
540 }
541 }
542 }
543
544 /// Processes an event received from the AMI server.
545 pub async fn on_event(&mut self, event: String, event_type: &str) {
546 let json_event = self.parse_event(&event).unwrap();
547 let event_name = json_event["Event"].as_str().unwrap_or("");
548
549 if let Some(callback) = self.event_callbacks.get(event_name) {
550 callback(json_event.clone());
551 }
552
553 if let Some(global_callback) = &self.global_callback {
554 global_callback(json_event.clone());
555 }
556
557 match event_type {
558 "rawEvent" => {
559 if let Some(raw_event_callback) = &self.raw_event_callback {
560 raw_event_callback(json_event.clone());
561 }
562 }
563 "responseEvent" => {
564 if let Some(response_event_callback) = &self.response_event_callback {
565 response_event_callback(json_event.clone());
566 }
567 }
568 _ => {}
569 }
570
571 let get_case_insensitive = |obj: &serde_json::Value, key: &str| {
572 obj.as_object()
573 .and_then(|map| {
574 map.iter()
575 .find(|(k, _)| k.eq_ignore_ascii_case(key))
576 .map(|(_, v)| v.clone())
577 })
578 .unwrap_or_else(|| serde_json::Value::Null)
579 };
580
581 if let Some(sender) = &self.event_sender {
582 match get_case_insensitive(&json_event, "Event")
583 .as_str()
584 .unwrap_or("")
585 {
586 "Newchannel" => {
587 let uniqueid = get_case_insensitive(&json_event, "UniqueID")
588 .as_str()
589 .unwrap_or("")
590 .to_string();
591 let participant = Participant {
592 name: get_case_insensitive(&json_event, "calleridname")
593 .as_str()
594 .unwrap_or("")
595 .to_string(),
596 number: get_case_insensitive(&json_event, "calleridnum")
597 .as_str()
598 .unwrap_or("")
599 .to_string(),
600 with: None,
601 };
602 self.participants.insert(uniqueid, participant);
603 }
604 "Newcallerid" => {
605 let uniqueid = get_case_insensitive(&json_event, "UniqueID")
606 .as_str()
607 .unwrap_or("")
608 .to_string();
609 if let Some(participant) = self.participants.get_mut(&uniqueid) {
610 if participant.number.is_empty() {
611 participant.number = get_case_insensitive(&json_event, "callerid")
612 .as_str()
613 .unwrap_or("")
614 .to_string();
615 }
616 if !get_case_insensitive(&json_event, "calleridname")
617 .as_str()
618 .unwrap_or("")
619 .starts_with('<')
620 {
621 participant.name = get_case_insensitive(&json_event, "calleridname")
622 .as_str()
623 .unwrap_or("")
624 .to_string();
625 }
626 }
627 }
628 "Dial" => {
629 let src_uniqueid = get_case_insensitive(&json_event, "srcuniqueid")
630 .as_str()
631 .unwrap()
632 .to_string();
633 let dest_uniqueid = get_case_insensitive(&json_event, "destuniqueid")
634 .as_str()
635 .unwrap()
636 .to_string();
637 if let Some(src_participant) = self.participants.get_mut(&src_uniqueid) {
638 src_participant.with = Some(dest_uniqueid.clone());
639 }
640 if let Some(dest_participant) = self.participants.get_mut(&dest_uniqueid) {
641 dest_participant.with = Some(src_uniqueid.clone());
642 }
643 sender.send("dialing".to_string()).await.unwrap();
644 }
645 "Link" => {
646 let _uniqueid1 = get_case_insensitive(&json_event, "uniqueid1")
647 .as_str()
648 .unwrap()
649 .to_string();
650 let _uniqueid2 = get_case_insensitive(&json_event, "uniqueid2")
651 .as_str()
652 .unwrap()
653 .to_string();
654 sender.send("callconnected".to_string()).await.unwrap();
655 }
656 "Unlink" => {
657 let _uniqueid1 = get_case_insensitive(&json_event, "uniqueid1")
658 .as_str()
659 .unwrap()
660 .to_string();
661 let _uniqueid2 = get_case_insensitive(&json_event, "uniqueid2")
662 .as_str()
663 .unwrap()
664 .to_string();
665 sender.send("calldisconnected".to_string()).await.unwrap();
666 }
667 "Hold" => {
668 let _uniqueid = get_case_insensitive(&json_event, "uniqueid")
669 .as_str()
670 .unwrap()
671 .to_string();
672 sender.send("hold".to_string()).await.unwrap();
673 }
674 "Unhold" => {
675 let _uniqueid = get_case_insensitive(&json_event, "uniqueid")
676 .as_str()
677 .unwrap()
678 .to_string();
679 sender.send("unhold".to_string()).await.unwrap();
680 }
681 "Hangup" => {
682 let uniqueid = get_case_insensitive(&json_event, "uniqueid")
683 .as_str()
684 .unwrap_or("")
685 .to_string();
686 self.participants.remove(&uniqueid);
687 sender.send("hangup".to_string()).await.unwrap();
688 }
689 "Cdr" => {
690 let id_caller = get_case_insensitive(&json_event, "uniqueid")
691 .as_str()
692 .unwrap()
693 .to_string();
694 let id_callee = self
695 .participants
696 .get(&id_caller)
697 .and_then(|p| p.with.clone())
698 .unwrap_or_default();
699 let _status = get_case_insensitive(&json_event, "disposition")
700 .as_str()
701 .unwrap_or("")
702 .to_lowercase();
703 sender.send("callreport".to_string()).await.unwrap();
704 self.participants.remove(&id_caller);
705 self.participants.remove(&id_callee);
706 }
707 _ => {}
708 }
709 }
710
711 let mut emitter = self.emitter.lock().unwrap();
712 emitter.push(event.clone());
713 }
714
715 /// Parses an event received from the AMI server.
716 pub fn parse_event(&self, data: &str) -> Result<serde_json::Value, String> {
717 let mut map = serde_json::Map::new();
718 let mut events_map: std::collections::HashMap<String, Vec<serde_json::Value>> =
719 std::collections::HashMap::new();
720 let mut current_event = serde_json::Map::new();
721 let mut in_event = false;
722 let mut event_type = String::new();
723
724 for line in data.lines() {
725 if line.trim().is_empty() {
726 if in_event {
727 events_map
728 .entry(event_type.clone())
729 .or_default()
730 .push(serde_json::Value::Object(current_event.clone()));
731 current_event.clear();
732 in_event = false;
733 }
734 continue;
735 }
736
737 if let Some((key, value)) = line.split_once(": ") {
738 let key = key.trim().to_string();
739 let value = value.trim().to_string();
740
741 if key == "Event" {
742 if in_event {
743 events_map
744 .entry(event_type.clone())
745 .or_default()
746 .push(serde_json::Value::Object(current_event.clone()));
747 current_event.clear();
748 }
749 event_type = value.clone();
750 in_event = true;
751 }
752
753 if in_event {
754 current_event.insert(key, serde_json::Value::String(value));
755 } else {
756 map.insert(key, serde_json::Value::String(value));
757 }
758 }
759 }
760
761 if in_event {
762 events_map
763 .entry(event_type.clone())
764 .or_default()
765 .push(serde_json::Value::Object(current_event));
766 }
767
768 for (event, events) in events_map {
769 if events.len() == 1 {
770 map.extend(events[0].as_object().unwrap().clone());
771 } else {
772 map.insert(event, serde_json::Value::Array(events));
773 }
774 }
775
776 Ok(serde_json::Value::Object(map))
777 }
778
779 /// Disconnects from the AMI server.
780 pub async fn disconnect(&mut self) {
781 log::debug!("Disconnecting");
782 if let Some(mut connection) = self.connection.take() {
783 let _ = connection.shutdown().await;
784 }
785 if let Some(sender) = &self.event_sender {
786 sender.send("serverdisconnect".to_string()).await.unwrap();
787 }
788 }
789
790 /// Returns whether the manager is authenticated.
791 pub fn is_authenticated(&self) -> bool {
792 self.authenticated
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799
800 #[test]
801 fn test_parse_event_single() {
802 let manager = Manager::new(
803 ManagerOptions {
804 port: 5038,
805 host: "example.com".to_string(),
806 username: "admin".to_string(),
807 password: "password".to_string(),
808 events: false,
809 },
810 None,
811 None,
812 );
813
814 let data = "Event: TestEvent\r\nKey: Value\r\n\r\n";
815 let parsed = manager.parse_event(data).unwrap();
816
817 assert_eq!(parsed["Key"], "Value");
818 }
819
820 #[test]
821 fn test_parse_event_multiple() {
822 let manager = Manager::new(
823 ManagerOptions {
824 port: 5038,
825 host: "example.com".to_string(),
826 username: "admin".to_string(),
827 password: "password".to_string(),
828 events: false,
829 },
830 None,
831 None,
832 );
833
834 let data =
835 "Event: TestEvent\r\nKey1: Value1\r\n\r\nEvent: TestEvent\r\nKey2: Value2\r\n\r\n";
836 let parsed = manager.parse_event(data).unwrap();
837
838 assert_eq!(parsed["TestEvent"][0]["Key1"], "Value1");
839 assert_eq!(parsed["TestEvent"][1]["Key2"], "Value2");
840 }
841
842 #[test]
843 fn test_parse_event_no_event() {
844 let manager = Manager::new(
845 ManagerOptions {
846 port: 5038,
847 host: "example.com".to_string(),
848 username: "admin".to_string(),
849 password: "password".to_string(),
850 events: false,
851 },
852 None,
853 None,
854 );
855
856 let data = "Key: Value\r\n\r\n";
857 let parsed = manager.parse_event(data).unwrap();
858
859 assert_eq!(parsed["Key"], "Value");
860 }
861}