asterisk_manager/
lib.rs

1//! # Asterisk Manager Library
2//!
3//! This library provides an implementation to manage connections and authentication with an Asterisk Call Manager (AMI) server.
4//!
5//! It offers a robust and efficient way to interact with the AMI server, handling connection retries, authentication, and event processing seamlessly. The library is built using Tokio for asynchronous operations, ensuring high performance and scalability. Additionally, it processes and formats data into JSON objects for easy manipulation and integration.
6//!
7//! ## Features
8//!
9//! - **Reconnection Logic**: Automatically handles reconnections to the AMI server in case of connection drops.
10//! - **Event Handling**: Processes various AMI events and provides a mechanism to handle them.
11//! - **Asynchronous Operations**: Utilizes Tokio for non-blocking, asynchronous operations.
12//! - **Participant Management**: Manages call participants and their states efficiently.
13//! - **JSON Data Handling**: Formats and processes data into JSON objects for easy manipulation.
14//! - **Event Callbacks**: Allows registration of callbacks for specific events, all events, raw events, and response events.
15//!
16//! ## Usage Example
17//!
18//! ```rust,no_run
19//! use asterisk_manager::{ManagerBuilder, ManagerOptions};
20//! use tokio::runtime::Runtime;
21//! use serde_json::Value;
22//!
23//! fn main() {
24//!     let rt = Runtime::new().unwrap();
25//!     rt.block_on(async {
26//!         let options = ManagerOptions {
27//!             port: 5038,
28//!             host: "example.com".to_string(),
29//!             username: "admin".to_string(),
30//!             password: "password".to_string(),
31//!             events: false,
32//!         };
33//!
34//!         let callback1 = Box::new(|event: Value| {
35//!             println!("Callback1 received event: {}", event);
36//!         });
37//!
38//!         let callback2 = Box::new(|event: Value| {
39//!             println!("Callback2 received event: {}", event);
40//!         });
41//!
42//!         let global_callback = Box::new(|event: Value| {
43//!             println!("Global callback received event: {}", event);
44//!         });
45//!
46//!         let raw_event_callback = Box::new(|event: Value| {
47//!             println!("Raw event callback received event: {}", event);
48//!         });
49//!
50//!         let response_event_callback = Box::new(|event: Value| {
51//!             println!("Response event callback received event: {}", event);
52//!         });
53//!
54//!         let mut manager = ManagerBuilder::new(options)
55//!             .on_event("Newchannel", callback1)
56//!             .on_event("Hangup", callback2)
57//!             .on_all_events(global_callback)
58//!             .on_all_raw_events(raw_event_callback)
59//!             .on_all_response_events(response_event_callback)
60//!             .build();
61//!
62//!         manager.connect_with_retries().await;
63//!
64//!         if !manager.is_authenticated() {
65//!             println!("Authentication failed");
66//!             return;
67//!         }
68//!
69//!         let action = serde_json::json!({
70//!             "action": "QueueStatus",
71//!         });
72//!         if let Err(err) = manager.send_action(action).await {
73//!             println!("Error sending action: {}", err);
74//!             return;
75//!         }
76//!
77//!         manager.read_data_with_retries().await;
78//!
79//!         manager.disconnect().await;
80//!     });
81//! }
82//! ```
83//!
84//! ## Participant Structure
85//!
86//! The `Participant` structure represents a participant in a call.
87//!
88//! ```rust
89//! #[derive(Debug)]
90//! pub struct Participant {
91//!     name: String,
92//!     number: String,
93//!     with: Option<String>,
94//! }
95//! ```
96//!
97//! ## Manager Structure
98//!
99//! The `Manager` structure handles connections and options for the AMI server.
100//!
101//! ```rust
102//! use asterisk_manager::ManagerOptions;
103//! use tokio::net::TcpStream;
104//! use std::collections::HashMap;
105//! use tokio::sync::mpsc;
106//! use asterisk_manager::Participant;
107//! use std::sync::{Arc, Mutex};
108//!
109//! pub struct Manager {
110//!     options: ManagerOptions,
111//!     connection: Option<TcpStream>,
112//!     authenticated: bool,
113//!     emitter: Arc<Mutex<Vec<String>>>,
114//!     event_sender: Option<mpsc::Sender<String>>,
115//!     participants: HashMap<String, Participant>,
116//!     event_callbacks: HashMap<String, Box<dyn Fn(serde_json::Value) + Send + Sync>>,
117//!     global_callback: Option<Box<dyn Fn(serde_json::Value) + Send + Sync>>,
118//!     raw_event_callback: Option<Box<dyn Fn(serde_json::Value) + Send + Sync>>,
119//!     response_event_callback: Option<Box<dyn Fn(serde_json::Value) + Send + Sync>>,
120//! }
121//! ```
122//!
123//! ## Main Methods
124//!
125//! ### `Manager::new`
126//! Creates a new instance of the Manager. The `event_sender` and `event_callback` parameters are optional.
127//!
128//! ### `Manager::connect`
129//! Connects to the AMI server.
130//!
131//! ### `Manager::connect_with_retries`
132//! Connects to the AMI server with reconnection logic.
133//!
134//! ### `Manager::authenticate`
135//! Authenticates with the AMI server.
136//!
137//! ### `Manager::send_action`
138//! Sends an action to the AMI server.
139//!
140//! ### `Manager::read_data`
141//! Reads data from the AMI server.
142//!
143//! ### `Manager::read_data_with_retries`
144//! Reads data from the AMI server with reconnection logic.
145//!
146//! ### `Manager::on_event`
147//! Processes an event received from the AMI server.
148//!
149//! ### `Manager::parse_event`
150//! Parses an event received from the AMI server.
151//!
152//! ### `Manager::disconnect`
153//! Disconnects from the AMI server.
154//!
155//! ### `Manager::is_authenticated`
156//! Returns whether the manager is authenticated.
157//!
158//! ## ManagerBuilder Structure
159//!
160//! The `ManagerBuilder` structure helps in building a `Manager` instance with various event callbacks.
161//!
162//! ### `ManagerBuilder::new`
163//! Creates a new instance of the `ManagerBuilder`.
164//!
165//! ### `ManagerBuilder::on_event`
166//! Registers a callback for a specific event.
167//!
168//! ### `ManagerBuilder::on_all_events`
169//! Registers a callback for all events.
170//!
171//! ### `ManagerBuilder::on_all_raw_events`
172//! Registers a callback for all raw events.
173//!
174//! ### `ManagerBuilder::on_all_response_events`
175//! Registers a callback for all response events.
176//!
177//! ### `ManagerBuilder::build`
178//! Builds and returns a `Manager` instance.
179
180use serde::{Deserialize, Serialize};
181use serde_json::Value;
182use std::collections::HashMap;
183use std::sync::{Arc, Mutex};
184use tokio::io::{AsyncReadExt, AsyncWriteExt};
185use tokio::net::TcpStream;
186use tokio::sync::mpsc;
187use tokio::time::{timeout, Duration};
188
189/// Structure that contains the Manager configuration options.
190/// Represents the configuration options for the manager.
191///
192/// # Fields
193///
194/// * `port` - The port to connect to Asterisk AMI.
195/// * `host` - The host to connect to Asterisk AMI.
196/// * `username` - The username to authenticate with.
197/// * `password` - The password to authenticate with.
198/// * `events` - Indicates whether events should be enabled.
199///
200/// # Example
201///
202/// ```rust
203/// use asterisk_manager::ManagerOptions;
204///
205/// let options = ManagerOptions {
206///     port: 5038,
207///     host: "example.com".to_string(),
208///     username: "admin".to_string(),
209///     password: "password".to_string(),
210///     events: false,
211/// };
212/// ```
213///
214/// # Example (JSON)
215///
216/// ```json
217/// {
218///     "port": 5038,
219///     "host": "example.com",
220///     "username": "admin",
221///     "password": "password",
222///     "events": false
223/// }
224/// ```
225///
226#[derive(Serialize, Deserialize, Debug)]
227pub struct ManagerOptions {
228    pub port: u16,
229    pub host: String,
230    pub username: String,
231    pub password: String,
232    pub events: bool,
233}
234
235/// Structure that represents the Manager.
236/// Represents a manager that handles connections and options.
237///
238/// # Fields
239///
240/// * `options` - Configuration options for the manager.
241/// * `connection` - An optional TCP stream representing the connection.
242/// * `authenticated` - Indicates whether the manager is authenticated. This field is currently unused.
243/// * `emitter` - A thread-safe vector of strings used for emitting events or messages.
244pub struct Manager {
245    options: ManagerOptions,
246    connection: Option<TcpStream>,
247    authenticated: bool,
248    emitter: Arc<Mutex<Vec<String>>>,
249    event_sender: Option<mpsc::Sender<String>>,
250    participants: HashMap<String, Participant>,
251    event_callbacks: HashMap<String, Box<dyn Fn(Value) + Send + Sync>>,
252    global_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
253    raw_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
254    response_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
255}
256
257pub struct ManagerBuilder {
258    options: ManagerOptions,
259    #[allow(dead_code)]
260    event_senders: HashMap<String, mpsc::Sender<String>>,
261    event_callbacks: HashMap<String, Box<dyn Fn(Value) + Send + Sync>>,
262    global_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
263    raw_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
264    response_event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
265}
266
267/// A builder for creating a `Manager` instance with various event callbacks.
268///
269/// # Example
270///
271/// ```
272/// use asterisk_manager::ManagerBuilder;
273/// use asterisk_manager::ManagerOptions;
274///
275/// let options = ManagerOptions {
276///     port: 5038,
277///     host: "example.com".to_string(),
278///     username: "admin".to_string(),
279///     password: "password".to_string(),
280///     events: false,
281/// };
282///
283/// let manager = ManagerBuilder::new(options)
284///     .on_event("event_name", Box::new(|value| {
285///         // handle event
286///     }))
287///     .on_all_events(Box::new(|value| {
288///         // handle all events
289///     }))
290///     .on_all_raw_events(Box::new(|value| {
291///         // handle all raw events
292///     }))
293///     .on_all_response_events(Box::new(|value| {
294///         // handle all response events
295///     }))
296///     .build();
297/// ```
298///
299/// # Methods
300///
301/// - `new(options: ManagerOptions) -> Self`: Creates a new `ManagerBuilder` with the specified options.
302/// - `on_event(mut self, event: &str, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for a specific event.
303/// - `on_all_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for all events.
304/// - `on_all_raw_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for all raw events.
305/// - `on_all_response_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self`: Registers a callback for all response events.
306/// - `build(self) -> Manager`: Consumes the builder and returns a `Manager` instance.
307impl ManagerBuilder {
308    /// Creates a new instance of the `ManagerBuilder`.
309    /// Initializes the builder with the specified options.
310    pub fn new(options: ManagerOptions) -> Self {
311        ManagerBuilder {
312            options,
313            event_senders: HashMap::new(),
314            event_callbacks: HashMap::new(),
315            global_callback: None,
316            raw_event_callback: None,
317            response_event_callback: None,
318        }
319    }
320
321    /// Registers a callback for a specific event.
322    pub fn on_event(mut self, event: &str, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
323        self.event_callbacks.insert(event.to_string(), callback);
324        self
325    }
326
327    /// Registers a callback for all events.
328    pub fn on_all_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
329        self.global_callback = Some(callback);
330        self
331    }
332
333    /// Registers a callback for all raw events.
334    pub fn on_all_raw_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
335        self.raw_event_callback = Some(callback);
336        self
337    }
338
339    /// Registers a callback for all response events by actions sended to AMI Asterisk.
340    pub fn on_all_response_events(mut self, callback: Box<dyn Fn(Value) + Send + Sync>) -> Self {
341        self.response_event_callback = Some(callback);
342        self
343    }
344
345    /// Consumes the builder and returns a `Manager` instance.
346    pub fn build(self) -> Manager {
347        Manager {
348            options: self.options,
349            connection: None,
350            authenticated: false,
351            emitter: Arc::new(Mutex::new(vec![])),
352            event_sender: None,
353            participants: HashMap::new(),
354            event_callbacks: self.event_callbacks,
355            global_callback: self.global_callback,
356            raw_event_callback: self.raw_event_callback,
357            response_event_callback: self.response_event_callback,
358        }
359    }
360}
361
362#[derive(Debug)]
363#[allow(dead_code)]
364pub struct Participant {
365    name: String,
366    number: String,
367    with: Option<String>,
368}
369
370impl Manager {
371    /// Creates a new instance of the Manager.
372    pub fn new(
373        options: ManagerOptions,
374        event_sender: Option<mpsc::Sender<String>>,
375        _event_callback: Option<Box<dyn Fn(Value) + Send + Sync>>,
376    ) -> Self {
377        Manager {
378            options,
379            connection: None,
380            authenticated: false,
381            emitter: Arc::new(Mutex::new(vec![])),
382            event_sender,
383            participants: HashMap::new(),
384            event_callbacks: HashMap::new(),
385            global_callback: None,
386            raw_event_callback: None,
387            response_event_callback: None,
388        }
389    }
390
391    /// Connects to the AMI server.
392    pub async fn connect(&mut self) -> Result<(), String> {
393        log::debug!("Connecting to {}:{}", self.options.host, self.options.port);
394        let connection = timeout(
395            Duration::from_secs(10),
396            TcpStream::connect((self.options.host.as_str(), self.options.port)),
397        )
398        .await
399        .map_err(|_| "Connection timed out".to_string())?
400        .map_err(|e| e.to_string())?;
401
402        self.connection = Some(connection);
403        if let Some(sender) = &self.event_sender {
404            sender.send("serverconnect".to_string()).await.unwrap();
405        }
406        self.authenticate().await
407    }
408
409    /// Connects to the AMI server with reconnection logic.
410    pub async fn connect_with_retries(&mut self) {
411        let mut attempts = 0;
412        loop {
413            match self.connect().await {
414                Ok(_) => {
415                    log::info!("Successfully connected to AMI server");
416                    break;
417                }
418                Err(err) => {
419                    log::error!("Failed to connect: {}", err);
420                    attempts += 1;
421                    let wait_time = if attempts > 10 {
422                        Duration::from_secs(60)
423                    } else {
424                        Duration::from_secs(5)
425                    };
426                    log::info!("Reconnecting in {:?}", wait_time);
427                    tokio::time::sleep(wait_time).await;
428                }
429            }
430        }
431    }
432
433    /// Authenticates with the AMI server.
434    async fn authenticate(&mut self) -> Result<(), String> {
435        log::debug!("Authenticating with username: {}", self.options.username);
436        let login_action = serde_json::json!({
437            "Action": "Login",
438            "event": if self.options.events { "on" } else { "off" },
439            "secret": self.options.password,
440            "username": self.options.username
441        });
442
443        self.send_action(login_action).await?;
444
445        let response = self.read_data().await?;
446
447        if response.contains("Response: Success") {
448            self.authenticated = true;
449            Ok(())
450        } else {
451            Err("Authentication failed".to_string())
452        }
453    }
454
455    /// Sends an action to the AMI server.
456    pub async fn send_action(&mut self, action: serde_json::Value) -> Result<(), String> {
457        let mut action_str = String::new();
458        for (key, value) in action.as_object().unwrap().iter() {
459            action_str.push_str(&format!("{}: {}\r\n", key, value.as_str().unwrap_or("")));
460        }
461        action_str.push_str("\r\n");
462
463        let connection = self
464            .connection
465            .as_mut()
466            .ok_or("No connection established")?;
467        connection
468            .write_all(action_str.as_bytes())
469            .await
470            .map_err(|e| e.to_string())?;
471
472        Ok(())
473    }
474
475    /// Reads data from the AMI server.
476    pub async fn read_data(&mut self) -> Result<String, String> {
477        let mut buffer = vec![0; 8192];
478        let mut complete_data = String::new();
479        let mut event_list_started = false;
480        let mut is_response_event = false;
481
482        let connection = self
483            .connection
484            .as_mut()
485            .ok_or("No connection established")?;
486
487        loop {
488            let n = match connection.read(&mut buffer).await {
489                Ok(n) if n == 0 => {
490                    return Err("Connection lost".to_string());
491                }
492                Ok(n) => n,
493                Err(e) => {
494                    return Err(e.to_string());
495                }
496            };
497
498            let data = String::from_utf8_lossy(&buffer[..n]);
499            complete_data.push_str(&data);
500
501            if data.contains("Response:") {
502                is_response_event = true;
503            }
504
505            if data.contains("EventList: start") {
506                event_list_started = true;
507            }
508
509            if event_list_started && data.contains("EventList: Complete") {
510                break;
511            }
512
513            if !event_list_started && data.contains("\r\n\r\n") {
514                break;
515            }
516        }
517
518        if is_response_event {
519            self.on_event(complete_data.clone(), "responseEvent").await;
520        } else {
521            for event in complete_data.split("\r\n\r\n") {
522                if !event.trim().is_empty() {
523                    self.on_event(event.to_string(), "rawEvent").await;
524                }
525            }
526        }
527
528        Ok(complete_data)
529    }
530
531    /// Reads data from the AMI server with reconnection logic.
532    pub async fn read_data_with_retries(&mut self) {
533        loop {
534            match self.read_data().await {
535                Ok(_) => {}
536                Err(err) => {
537                    log::error!("Error reading data: {}", err);
538                    self.connect_with_retries().await;
539                }
540            }
541        }
542    }
543
544    /// Processes an event received from the AMI server.
545    pub async fn on_event(&mut self, event: String, event_type: &str) {
546        let json_event = self.parse_event(&event).unwrap();
547        let event_name = json_event["Event"].as_str().unwrap_or("");
548
549        if let Some(callback) = self.event_callbacks.get(event_name) {
550            callback(json_event.clone());
551        }
552
553        if let Some(global_callback) = &self.global_callback {
554            global_callback(json_event.clone());
555        }
556
557        match event_type {
558            "rawEvent" => {
559                if let Some(raw_event_callback) = &self.raw_event_callback {
560                    raw_event_callback(json_event.clone());
561                }
562            }
563            "responseEvent" => {
564                if let Some(response_event_callback) = &self.response_event_callback {
565                    response_event_callback(json_event.clone());
566                }
567            }
568            _ => {}
569        }
570
571        let get_case_insensitive = |obj: &serde_json::Value, key: &str| {
572            obj.as_object()
573                .and_then(|map| {
574                    map.iter()
575                        .find(|(k, _)| k.eq_ignore_ascii_case(key))
576                        .map(|(_, v)| v.clone())
577                })
578                .unwrap_or_else(|| serde_json::Value::Null)
579        };
580
581        if let Some(sender) = &self.event_sender {
582            match get_case_insensitive(&json_event, "Event")
583                .as_str()
584                .unwrap_or("")
585            {
586                "Newchannel" => {
587                    let uniqueid = get_case_insensitive(&json_event, "UniqueID")
588                        .as_str()
589                        .unwrap_or("")
590                        .to_string();
591                    let participant = Participant {
592                        name: get_case_insensitive(&json_event, "calleridname")
593                            .as_str()
594                            .unwrap_or("")
595                            .to_string(),
596                        number: get_case_insensitive(&json_event, "calleridnum")
597                            .as_str()
598                            .unwrap_or("")
599                            .to_string(),
600                        with: None,
601                    };
602                    self.participants.insert(uniqueid, participant);
603                }
604                "Newcallerid" => {
605                    let uniqueid = get_case_insensitive(&json_event, "UniqueID")
606                        .as_str()
607                        .unwrap_or("")
608                        .to_string();
609                    if let Some(participant) = self.participants.get_mut(&uniqueid) {
610                        if participant.number.is_empty() {
611                            participant.number = get_case_insensitive(&json_event, "callerid")
612                                .as_str()
613                                .unwrap_or("")
614                                .to_string();
615                        }
616                        if !get_case_insensitive(&json_event, "calleridname")
617                            .as_str()
618                            .unwrap_or("")
619                            .starts_with('<')
620                        {
621                            participant.name = get_case_insensitive(&json_event, "calleridname")
622                                .as_str()
623                                .unwrap_or("")
624                                .to_string();
625                        }
626                    }
627                }
628                "Dial" => {
629                    let src_uniqueid = get_case_insensitive(&json_event, "srcuniqueid")
630                        .as_str()
631                        .unwrap()
632                        .to_string();
633                    let dest_uniqueid = get_case_insensitive(&json_event, "destuniqueid")
634                        .as_str()
635                        .unwrap()
636                        .to_string();
637                    if let Some(src_participant) = self.participants.get_mut(&src_uniqueid) {
638                        src_participant.with = Some(dest_uniqueid.clone());
639                    }
640                    if let Some(dest_participant) = self.participants.get_mut(&dest_uniqueid) {
641                        dest_participant.with = Some(src_uniqueid.clone());
642                    }
643                    sender.send("dialing".to_string()).await.unwrap();
644                }
645                "Link" => {
646                    let _uniqueid1 = get_case_insensitive(&json_event, "uniqueid1")
647                        .as_str()
648                        .unwrap()
649                        .to_string();
650                    let _uniqueid2 = get_case_insensitive(&json_event, "uniqueid2")
651                        .as_str()
652                        .unwrap()
653                        .to_string();
654                    sender.send("callconnected".to_string()).await.unwrap();
655                }
656                "Unlink" => {
657                    let _uniqueid1 = get_case_insensitive(&json_event, "uniqueid1")
658                        .as_str()
659                        .unwrap()
660                        .to_string();
661                    let _uniqueid2 = get_case_insensitive(&json_event, "uniqueid2")
662                        .as_str()
663                        .unwrap()
664                        .to_string();
665                    sender.send("calldisconnected".to_string()).await.unwrap();
666                }
667                "Hold" => {
668                    let _uniqueid = get_case_insensitive(&json_event, "uniqueid")
669                        .as_str()
670                        .unwrap()
671                        .to_string();
672                    sender.send("hold".to_string()).await.unwrap();
673                }
674                "Unhold" => {
675                    let _uniqueid = get_case_insensitive(&json_event, "uniqueid")
676                        .as_str()
677                        .unwrap()
678                        .to_string();
679                    sender.send("unhold".to_string()).await.unwrap();
680                }
681                "Hangup" => {
682                    let uniqueid = get_case_insensitive(&json_event, "uniqueid")
683                        .as_str()
684                        .unwrap_or("")
685                        .to_string();
686                    self.participants.remove(&uniqueid);
687                    sender.send("hangup".to_string()).await.unwrap();
688                }
689                "Cdr" => {
690                    let id_caller = get_case_insensitive(&json_event, "uniqueid")
691                        .as_str()
692                        .unwrap()
693                        .to_string();
694                    let id_callee = self
695                        .participants
696                        .get(&id_caller)
697                        .and_then(|p| p.with.clone())
698                        .unwrap_or_default();
699                    let _status = get_case_insensitive(&json_event, "disposition")
700                        .as_str()
701                        .unwrap_or("")
702                        .to_lowercase();
703                    sender.send("callreport".to_string()).await.unwrap();
704                    self.participants.remove(&id_caller);
705                    self.participants.remove(&id_callee);
706                }
707                _ => {}
708            }
709        }
710
711        let mut emitter = self.emitter.lock().unwrap();
712        emitter.push(event.clone());
713    }
714
715    /// Parses an event received from the AMI server.
716    pub fn parse_event(&self, data: &str) -> Result<serde_json::Value, String> {
717        let mut map = serde_json::Map::new();
718        let mut events_map: std::collections::HashMap<String, Vec<serde_json::Value>> =
719            std::collections::HashMap::new();
720        let mut current_event = serde_json::Map::new();
721        let mut in_event = false;
722        let mut event_type = String::new();
723
724        for line in data.lines() {
725            if line.trim().is_empty() {
726                if in_event {
727                    events_map
728                        .entry(event_type.clone())
729                        .or_default()
730                        .push(serde_json::Value::Object(current_event.clone()));
731                    current_event.clear();
732                    in_event = false;
733                }
734                continue;
735            }
736
737            if let Some((key, value)) = line.split_once(": ") {
738                let key = key.trim().to_string();
739                let value = value.trim().to_string();
740
741                if key == "Event" {
742                    if in_event {
743                        events_map
744                            .entry(event_type.clone())
745                            .or_default()
746                            .push(serde_json::Value::Object(current_event.clone()));
747                        current_event.clear();
748                    }
749                    event_type = value.clone();
750                    in_event = true;
751                }
752
753                if in_event {
754                    current_event.insert(key, serde_json::Value::String(value));
755                } else {
756                    map.insert(key, serde_json::Value::String(value));
757                }
758            }
759        }
760
761        if in_event {
762            events_map
763                .entry(event_type.clone())
764                .or_default()
765                .push(serde_json::Value::Object(current_event));
766        }
767
768        for (event, events) in events_map {
769            if events.len() == 1 {
770                map.extend(events[0].as_object().unwrap().clone());
771            } else {
772                map.insert(event, serde_json::Value::Array(events));
773            }
774        }
775
776        Ok(serde_json::Value::Object(map))
777    }
778
779    /// Disconnects from the AMI server.
780    pub async fn disconnect(&mut self) {
781        log::debug!("Disconnecting");
782        if let Some(mut connection) = self.connection.take() {
783            let _ = connection.shutdown().await;
784        }
785        if let Some(sender) = &self.event_sender {
786            sender.send("serverdisconnect".to_string()).await.unwrap();
787        }
788    }
789
790    /// Returns whether the manager is authenticated.
791    pub fn is_authenticated(&self) -> bool {
792        self.authenticated
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799
800    #[test]
801    fn test_parse_event_single() {
802        let manager = Manager::new(
803            ManagerOptions {
804                port: 5038,
805                host: "example.com".to_string(),
806                username: "admin".to_string(),
807                password: "password".to_string(),
808                events: false,
809            },
810            None,
811            None,
812        );
813
814        let data = "Event: TestEvent\r\nKey: Value\r\n\r\n";
815        let parsed = manager.parse_event(data).unwrap();
816
817        assert_eq!(parsed["Key"], "Value");
818    }
819
820    #[test]
821    fn test_parse_event_multiple() {
822        let manager = Manager::new(
823            ManagerOptions {
824                port: 5038,
825                host: "example.com".to_string(),
826                username: "admin".to_string(),
827                password: "password".to_string(),
828                events: false,
829            },
830            None,
831            None,
832        );
833
834        let data =
835            "Event: TestEvent\r\nKey1: Value1\r\n\r\nEvent: TestEvent\r\nKey2: Value2\r\n\r\n";
836        let parsed = manager.parse_event(data).unwrap();
837
838        assert_eq!(parsed["TestEvent"][0]["Key1"], "Value1");
839        assert_eq!(parsed["TestEvent"][1]["Key2"], "Value2");
840    }
841
842    #[test]
843    fn test_parse_event_no_event() {
844        let manager = Manager::new(
845            ManagerOptions {
846                port: 5038,
847                host: "example.com".to_string(),
848                username: "admin".to_string(),
849                password: "password".to_string(),
850                events: false,
851            },
852            None,
853            None,
854        );
855
856        let data = "Key: Value\r\n\r\n";
857        let parsed = manager.parse_event(data).unwrap();
858
859        assert_eq!(parsed["Key"], "Value");
860    }
861}