asterisk_manager/
lib.rs

1//! # Asterisk Manager Library
2//!
3//! A modern, strongly-typed, stream-based library for integration with the Asterisk Manager Interface (AMI).
4//!
5//! - **Typed AMI messages**: Actions, Events, and Responses as Rust enums/structs.
6//! - **Stream-based API**: Consume events via `tokio_stream`.
7//! - **Asynchronous operations**: Fully based on Tokio.
8//!
9//! ## Usage Example
10//!
11//! ```rust,no_run
12//! use asterisk_manager::{Manager, ManagerOptions, AmiAction};
13//! use tokio_stream::StreamExt;
14//!
15//! #[tokio::main]
16//! async fn main() {
17//!     let options = ManagerOptions {
18//!         port: 5038,
19//!         host: "127.0.0.1".to_string(),
20//!         username: "admin".to_string(),
21//!         password: "password".to_string(),
22//!         events: true,
23//!     };
24//!     let manager = Manager::new(options);
25//!     manager.connect_and_login().await.unwrap();
26//!
27//!     let mut events = manager.all_events_stream().await;
28//!     tokio::spawn(async move {
29//!         while let Some(Ok(ev)) = events.next().await {
30//!             println!("Event: {:?}", ev);
31//!         }
32//!     });
33//!
34//!     let resp = manager.send_action(AmiAction::Ping { action_id: None }).await.unwrap();
35//!     println!("Ping response: {:?}", resp);
36//!     manager.disconnect().await.unwrap();
37//! }
38//! ```
39//!
40//! ## Features
41//!
42//! - Login/logout, sending actions, and receiving AMI events.
43//! - Support for common events (`Newchannel`, `Hangup`, `PeerStatus`) and fallback for unknown events.
44//! - Detailed error handling via the `AmiError` enum.
45//!
46//! ## Requirements
47//!
48//! - Rust 1.70+
49//! - Tokio (async runtime)
50//!
51//! ## License
52//!
53//! MIT
54
55use serde::de::Deserializer;
56use serde::{Deserialize, Serialize};
57use std::collections::HashMap;
58use std::sync::Arc;
59use thiserror::Error;
60use tokio::io::{AsyncReadExt, AsyncWriteExt};
61use tokio::net::TcpStream;
62use tokio::sync::{broadcast, oneshot, Mutex};
63use tokio::time::{timeout, Duration};
64use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
65use tokio_stream::wrappers::BroadcastStream;
66use tokio_stream::Stream;
67use uuid::Uuid;
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct AmiResponse {
71    #[serde(rename = "Response")]
72    pub response: String,
73    #[serde(rename = "ActionID")]
74    pub action_id: Option<String>,
75    #[serde(rename = "Message")]
76    pub message: Option<String>,
77    #[serde(flatten)]
78    pub fields: HashMap<String, String>,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(tag = "Action", rename_all = "PascalCase")]
83pub enum AmiAction {
84    Login {
85        username: String,
86        secret: String,
87        #[serde(rename = "Events")]
88        events: Option<String>,
89        #[serde(rename = "ActionID")]
90        action_id: Option<String>,
91    },
92    Logoff {
93        #[serde(rename = "ActionID")]
94        action_id: Option<String>,
95    },
96    Ping {
97        #[serde(rename = "ActionID")]
98        action_id: Option<String>,
99    },
100    Command {
101        command: String,
102        #[serde(rename = "ActionID")]
103        action_id: Option<String>,
104    },
105    Custom {
106        action: String,
107        #[serde(flatten)]
108        params: HashMap<String, String>,
109        #[serde(rename = "ActionID")]
110        action_id: Option<String>,
111    },
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct NewchannelEventData {
116    #[serde(rename = "Channel")]
117    pub channel: String,
118    #[serde(rename = "Uniqueid")]
119    pub uniqueid: String,
120    #[serde(rename = "ChannelState")]
121    pub channel_state: Option<String>,
122    #[serde(rename = "ChannelStateDesc")]
123    pub channel_state_desc: Option<String>,
124    #[serde(rename = "CallerIDNum")]
125    pub caller_id_num: Option<String>,
126    #[serde(rename = "CallerIDName")]
127    pub caller_id_name: Option<String>,
128    #[serde(flatten)]
129    pub other: HashMap<String, String>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct HangupEventData {
134    #[serde(rename = "Channel")]
135    pub channel: String,
136    #[serde(rename = "Uniqueid")]
137    pub uniqueid: String,
138    #[serde(rename = "Cause")]
139    pub cause: Option<String>,
140    #[serde(rename = "Cause-txt")]
141    pub cause_txt: Option<String>,
142    #[serde(flatten)]
143    pub other: HashMap<String, String>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct PeerStatusEventData {
148    #[serde(rename = "Peer")]
149    pub peer: String,
150    #[serde(rename = "PeerStatus")]
151    pub peer_status: String,
152    #[serde(flatten)]
153    pub other: HashMap<String, String>,
154}
155
156#[derive(Debug, Clone, Serialize)]
157pub enum AmiEvent {
158    Newchannel(NewchannelEventData),
159    Hangup(HangupEventData),
160    PeerStatus(PeerStatusEventData),
161    UnknownEvent {
162        event_type: String,
163        fields: HashMap<String, String>,
164    },
165    InternalConnectionLost {
166        error: String,
167    },
168}
169
170impl<'de> Deserialize<'de> for AmiEvent {
171    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
172    where
173        D: Deserializer<'de>,
174    {
175        let value = serde_json::Value::deserialize(deserializer)?;
176        let map_obj = value
177            .as_object()
178            .ok_or_else(|| serde::de::Error::custom("AmiEvent: Expected a JSON object/map"))?;
179
180        if let Some(event_type_val) = map_obj.get("Event") {
181            let event_type_str = event_type_val.as_str().ok_or_else(|| {
182                serde::de::Error::custom("AmiEvent: 'Event' field is not a string")
183            })?;
184
185            match event_type_str {
186                "Newchannel" => Ok(AmiEvent::Newchannel(
187                    NewchannelEventData::deserialize(value.clone())
188                        .map_err(serde::de::Error::custom)?,
189                )),
190                "Hangup" => Ok(AmiEvent::Hangup(
191                    HangupEventData::deserialize(value.clone())
192                        .map_err(serde::de::Error::custom)?,
193                )),
194                "PeerStatus" => Ok(AmiEvent::PeerStatus(
195                    PeerStatusEventData::deserialize(value.clone())
196                        .map_err(serde::de::Error::custom)?,
197                )),
198                _ => {
199                    let fields: HashMap<String, String> = map_obj
200                        .iter()
201                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
202                        .collect();
203                    Ok(AmiEvent::UnknownEvent {
204                        event_type: event_type_str.to_string(),
205                        fields,
206                    })
207                }
208            }
209        } else {
210            let fields: HashMap<String, String> = map_obj
211                .iter()
212                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
213                .collect();
214            Ok(AmiEvent::UnknownEvent {
215                event_type: "UnknownOrMalformed".to_string(),
216                fields,
217            })
218        }
219    }
220}
221
222#[derive(Debug, Error)]
223pub enum AmiError {
224    #[error("IO error: {0}")]
225    Io(#[from] std::io::Error),
226    #[error("Parse error: {0}")]
227    ParseError(String),
228    #[error("Serialize error: {0}")]
229    SerializeError(String),
230    #[error("Authentication failed: {0}")]
231    AuthenticationFailed(String),
232    #[error("Action failed: {response:?}")]
233    ActionFailed { response: AmiResponse },
234    #[error("Connection closed")]
235    ConnectionClosed,
236    #[error("Operation timed out")]
237    Timeout,
238    #[error("Login required")]
239    LoginRequired,
240    #[error("Internal channel error: {0}")]
241    ChannelError(String),
242    #[error("Event stream lagged: {0}")]
243    EventStreamLagged(#[from] tokio::sync::broadcast::error::RecvError),
244    #[error("Not connected to AMI server")]
245    NotConnected,
246    #[error("Other error: {0}")]
247    Other(String),
248}
249
250#[derive(Serialize, Deserialize, Debug, Clone)]
251pub struct ManagerOptions {
252    pub port: u16,
253    pub host: String,
254    pub username: String,
255    pub password: String,
256    pub events: bool,
257}
258
259struct InnerManager {
260    options: ManagerOptions,
261    connection: Option<TcpStream>,
262    authenticated: bool,
263    event_broadcaster: broadcast::Sender<AmiEvent>,
264    pending_responses: HashMap<String, oneshot::Sender<Result<AmiResponse, AmiError>>>,
265}
266
267#[derive(Clone)]
268pub struct Manager {
269    inner: Arc<Mutex<InnerManager>>,
270}
271
272impl Manager {
273    pub fn new(options: ManagerOptions) -> Self {
274        let (event_tx, _) = broadcast::channel(1024);
275        Self {
276            inner: Arc::new(Mutex::new(InnerManager {
277                options,
278                connection: None,
279                authenticated: false,
280                event_broadcaster: event_tx,
281                pending_responses: HashMap::new(),
282            })),
283        }
284    }
285
286    pub async fn connect_and_login(&self) -> Result<(), AmiError> {
287        {
288            let mut inner = self.inner.lock().await;
289            inner.connect().await?;
290            inner.authenticate().await?;
291        }
292        let this = self.clone();
293        tokio::spawn(async move {
294            let _ = this.read_loop().await;
295        });
296        Ok(())
297    }
298
299    pub async fn send_action(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
300        let action_id = get_or_set_action_id(&mut action);
301
302        let (tx, rx) = oneshot::channel();
303        {
304            let mut inner = self.inner.lock().await;
305            if !inner.authenticated && !matches!(action, AmiAction::Login { .. }) {
306                return Err(AmiError::LoginRequired);
307            }
308            if inner.connection.is_none() {
309                return Err(AmiError::NotConnected);
310            }
311
312            inner.pending_responses.insert(action_id.clone(), tx);
313            let action_str = serialize_ami_action(&action)?;
314            let conn = inner.connection.as_mut().ok_or(AmiError::NotConnected)?;
315
316            conn.write_all(action_str.as_bytes())
317                .await
318                .map_err(AmiError::Io)?;
319            conn.flush().await.map_err(AmiError::Io)?;
320        }
321        match timeout(Duration::from_secs(10), rx).await {
322            Ok(Ok(Ok(resp))) => {
323                if resp.response.eq_ignore_ascii_case("Error") {
324                    Err(AmiError::ActionFailed { response: resp })
325                } else {
326                    Ok(resp)
327                }
328            }
329            Ok(Ok(Err(e))) => Err(e),
330            Ok(Err(_)) => Err(AmiError::ChannelError("Responder dropped".to_string())),
331            Err(_) => Err(AmiError::Timeout),
332        }
333    }
334
335    async fn read_loop(&self) -> Result<(), AmiError> {
336        loop {
337            let processing_result: Result<(), AmiError> = async {
338                loop {
339                    let raw_data: String;
340                    {
341                        let mut inner = self.inner.lock().await;
342                        raw_data = inner.read_ami_message_raw().await?;
343                    }
344                    let parsed_messages = parse_ami_protocol_message(&raw_data)?;
345                    {
346                        let mut inner = self.inner.lock().await;
347                        for value_msg in parsed_messages {
348                            if value_msg.get("Event").is_some() {
349                                match serde_json::from_value::<AmiEvent>(value_msg.clone())
350                                    .map_err(|e| AmiError::ParseError(format!("AmiEvent: {}", e)))
351                                {
352                                    Ok(event) => {
353                                        let _ = inner.event_broadcaster.send(event);
354                                    }
355                                    Err(_) => {
356                                        let mut fallback = HashMap::new();
357                                        if let Some(obj) = value_msg.as_object() {
358                                            for (k, v) in obj {
359                                                if let Some(s) = v.as_str() {
360                                                    fallback.insert(k.clone(), s.to_string());
361                                                }
362                                            }
363                                        }
364                                        let _ =
365                                            inner.event_broadcaster.send(AmiEvent::UnknownEvent {
366                                                event_type: "ParseError".to_string(),
367                                                fields: fallback,
368                                            });
369                                    }
370                                }
371                            } else if value_msg.get("Response").is_some() {
372                                match serde_json::from_value::<AmiResponse>(value_msg.clone())
373                                    .map_err(|e| {
374                                        AmiError::ParseError(format!("AmiResponse: {}", e))
375                                    }) {
376                                    Ok(resp) => {
377                                        if let Some(action_id) = &resp.action_id {
378                                            if let Some(responder) =
379                                                inner.pending_responses.remove(action_id)
380                                            {
381                                                let _ = responder.send(Ok(resp));
382                                            }
383                                        }
384                                    }
385                                    Err(parse_err) => {
386                                        if let Some(action_id) =
387                                            value_msg.get("ActionID").and_then(|v| v.as_str())
388                                        {
389                                            if let Some(responder) =
390                                                inner.pending_responses.remove(action_id)
391                                            {
392                                                let _ = responder.send(Err(parse_err));
393                                            }
394                                        }
395                                    }
396                                }
397                            }
398                        }
399                    }
400                }
401            }
402            .await;
403
404            match processing_result {
405                Ok(()) => return Ok(()),
406                Err(err) => {
407                    {
408                        let mut inner = self.inner.lock().await;
409                        inner.authenticated = false;
410                        inner.connection = None;
411                        for (_, responder) in inner.pending_responses.drain() {
412                            let _ = responder.send(Err(AmiError::ConnectionClosed));
413                        }
414                        let _ = inner
415                            .event_broadcaster
416                            .send(AmiEvent::InternalConnectionLost {
417                                error: format!("{}", err),
418                            });
419                    }
420                    return Err(err);
421                }
422            }
423        }
424    }
425
426    pub async fn disconnect(&self) -> Result<(), AmiError> {
427        let mut inner = self.inner.lock().await;
428        if let Some(mut connection) = inner.connection.take() {
429            let logoff_action = AmiAction::Logoff {
430                action_id: Some("rust-ami-logoff".to_string()),
431            };
432            let action_str = serialize_ami_action(&logoff_action)?;
433            let _ = connection.write_all(action_str.as_bytes()).await;
434            let _ = connection.shutdown().await;
435        }
436        inner.authenticated = false;
437        Ok(())
438    }
439
440    pub async fn is_authenticated(&self) -> bool {
441        let inner = self.inner.lock().await;
442        inner.authenticated
443    }
444
445    pub async fn all_events_stream(
446        &self,
447    ) -> impl Stream<Item = Result<AmiEvent, BroadcastStreamRecvError>> + Send + Unpin {
448        let inner = self.inner.lock().await;
449        BroadcastStream::new(inner.event_broadcaster.subscribe())
450    }
451}
452
453impl InnerManager {
454    async fn connect(&mut self) -> Result<(), AmiError> {
455        let stream = timeout(
456            Duration::from_secs(10),
457            TcpStream::connect((self.options.host.as_str(), self.options.port)),
458        )
459        .await
460        .map_err(|_| AmiError::Timeout)?
461        .map_err(AmiError::Io)?;
462        self.connection = Some(stream);
463        let mut temp_buf = [0; 1024];
464        if let Some(conn) = self.connection.as_mut() {
465            let _ = conn.read(&mut temp_buf).await;
466        }
467        Ok(())
468    }
469
470    async fn authenticate(&mut self) -> Result<(), AmiError> {
471        let login_action = AmiAction::Login {
472            username: self.options.username.clone(),
473            secret: self.options.password.clone(),
474            events: Some(if self.options.events { "on" } else { "off" }.to_string()),
475            action_id: Some("rust-ami-login".to_string()),
476        };
477        let action_str = serialize_ami_action(&login_action)?;
478        let conn = self.connection.as_mut().ok_or(AmiError::NotConnected)?;
479        conn.write_all(action_str.as_bytes())
480            .await
481            .map_err(AmiError::Io)?;
482        let response_data = self.read_ami_message_raw().await?;
483        let parsed = parse_ami_protocol_message(&response_data)?;
484        for value_msg in parsed {
485            if let Ok(resp) = serde_json::from_value::<AmiResponse>(value_msg) {
486                if resp.response.eq_ignore_ascii_case("Success") {
487                    self.authenticated = true;
488                    return Ok(());
489                } else if resp.response.eq_ignore_ascii_case("Error") {
490                    return Err(AmiError::AuthenticationFailed(
491                        resp.message.unwrap_or_default(),
492                    ));
493                }
494            }
495        }
496        Err(AmiError::AuthenticationFailed(
497            "No valid success response received for login".to_string(),
498        ))
499    }
500
501    async fn read_ami_message_raw(&mut self) -> Result<String, AmiError> {
502        let mut buffer = vec![0; 8192];
503        let mut complete_data = String::new();
504
505        let (_local_addr_str, _peer_addr_str) = {
506            let conn_ref = self.connection.as_ref().ok_or(AmiError::NotConnected)?;
507            let local_addr = conn_ref.local_addr().map_err(AmiError::Io)?;
508            let peer_addr = conn_ref.peer_addr().map_err(AmiError::Io)?;
509            (local_addr.to_string(), peer_addr.to_string())
510        };
511
512        let connection = self.connection.as_mut().ok_or(AmiError::NotConnected)?;
513        loop {
514            let n = connection
515                .read(&mut buffer)
516                .await
517                .map_err(|e| AmiError::Io(e))?;
518            if n == 0 {
519                return Err(AmiError::ConnectionClosed);
520            }
521            let data_chunk_str = String::from_utf8_lossy(&buffer[..n]);
522            complete_data.push_str(&data_chunk_str);
523            if complete_data.ends_with("\r\n\r\n") {
524                break;
525            }
526        }
527        Ok(complete_data)
528    }
529}
530
531fn parse_ami_protocol_message(raw_data: &str) -> Result<Vec<serde_json::Value>, AmiError> {
532    let mut messages = Vec::new();
533    for block in raw_data.trim().split("\r\n\r\n") {
534        if block.is_empty() {
535            continue;
536        }
537        let mut map = serde_json::Map::new();
538        for line in block.lines() {
539            if let Some((key, value)) = line.split_once(": ") {
540                map.insert(
541                    key.trim().to_string(),
542                    serde_json::Value::String(value.trim().to_string()),
543                );
544            }
545        }
546        if !map.is_empty() {
547            messages.push(serde_json::Value::Object(map));
548        }
549    }
550    Ok(messages)
551}
552
553fn serialize_ami_action(action: &AmiAction) -> Result<String, AmiError> {
554    let mut s = String::new();
555    match action {
556        AmiAction::Login {
557            username,
558            secret,
559            events,
560            action_id,
561        } => {
562            s.push_str("Action: Login\r\n");
563            s.push_str(&format!("Username: {}\r\n", username));
564            s.push_str(&format!("Secret: {}\r\n", secret));
565            if let Some(ev) = events {
566                s.push_str(&format!("Events: {}\r\n", ev));
567            }
568            if let Some(id) = action_id {
569                s.push_str(&format!("ActionID: {}\r\n", id));
570            }
571        }
572        AmiAction::Logoff { action_id } => {
573            s.push_str("Action: Logoff\r\n");
574            if let Some(id) = action_id {
575                s.push_str(&format!("ActionID: {}\r\n", id));
576            }
577        }
578        AmiAction::Ping { action_id } => {
579            s.push_str("Action: Ping\r\n");
580            if let Some(id) = action_id {
581                s.push_str(&format!("ActionID: {}\r\n", id));
582            }
583        }
584        AmiAction::Command { command, action_id } => {
585            s.push_str("Action: Command\r\n");
586            s.push_str(&format!("Command: {}\r\n", command));
587            if let Some(id) = action_id {
588                s.push_str(&format!("ActionID: {}\r\n", id));
589            }
590        }
591        AmiAction::Custom {
592            action: action_name,
593            params,
594            action_id,
595        } => {
596            s.push_str(&format!("Action: {}\r\n", action_name));
597            for (k, v) in params {
598                s.push_str(&format!("{}: {}\r\n", k, v));
599            }
600            if let Some(id) = action_id {
601                s.push_str(&format!("ActionID: {}\r\n", id));
602            }
603        }
604    }
605    s.push_str("\r\n");
606    Ok(s)
607}
608
609fn get_or_set_action_id(action: &mut AmiAction) -> String {
610    match action {
611        AmiAction::Login { action_id, .. }
612        | AmiAction::Logoff { action_id }
613        | AmiAction::Ping { action_id }
614        | AmiAction::Command { action_id, .. }
615        | AmiAction::Custom { action_id, .. } => {
616            if let Some(id) = action_id {
617                id.clone()
618            } else {
619                let new_id = Uuid::new_v4().to_string();
620                *action_id = Some(new_id.clone());
621                new_id
622            }
623        }
624    }
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630    use tokio_stream::StreamExt;
631
632    #[test]
633    fn test_serialize_login_action() {
634        let action = AmiAction::Login {
635            username: "user".to_string(),
636            secret: "pass".to_string(),
637            events: Some("on".to_string()),
638            action_id: Some("abc123".to_string()),
639        };
640        let s = serialize_ami_action(&action).unwrap();
641        assert!(s.contains("Action: Login"));
642        assert!(s.contains("Username: user"));
643        assert!(s.contains("Secret: pass"));
644        assert!(s.contains("Events: on"));
645        assert!(s.contains("ActionID: abc123"));
646        assert!(s.ends_with("\r\n\r\n"));
647    }
648
649    #[test]
650    fn test_serialize_command_action() {
651        let action = AmiAction::Command {
652            command: "sip show peers".to_string(),
653            action_id: None,
654        };
655        let s = serialize_ami_action(&action).unwrap();
656        assert!(s.contains("Action: Command"));
657        assert!(s.contains("Command: sip show peers"));
658    }
659
660    #[test]
661    fn test_parse_ami_protocol_message() {
662        let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
663        let parsed = parse_ami_protocol_message(raw).unwrap();
664        assert_eq!(parsed.len(), 1);
665        let obj = &parsed[0];
666        assert_eq!(obj["Response"], "Success");
667        assert_eq!(obj["ActionID"], "123");
668        assert_eq!(obj["Message"], "Authentication accepted");
669    }
670
671    #[test]
672    fn test_deserialize_ami_response() {
673        let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
674        let parsed = parse_ami_protocol_message(raw).unwrap();
675        let resp: AmiResponse = serde_json::from_value(parsed[0].clone()).unwrap();
676        assert_eq!(resp.response, "Success");
677        assert_eq!(resp.action_id.as_deref(), Some("123"));
678        assert_eq!(resp.message.as_deref(), Some("Authentication accepted"));
679    }
680
681    #[test]
682    fn test_deserialize_newchannel_event() {
683        let raw = "Event: Newchannel\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nChannelState: 4\r\nChannelStateDesc: Ring\r\nCallerIDNum: 100\r\nCallerIDName: Alice\r\n\r\n";
684        let parsed = parse_ami_protocol_message(raw).unwrap();
685        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
686        match event {
687            AmiEvent::Newchannel(data) => {
688                assert_eq!(data.channel, "SIP/100-00000001");
689                assert_eq!(data.uniqueid, "1234");
690                assert_eq!(data.channel_state.as_deref(), Some("4"));
691                assert_eq!(data.channel_state_desc.as_deref(), Some("Ring"));
692                assert_eq!(data.caller_id_num.as_deref(), Some("100"));
693                assert_eq!(data.caller_id_name.as_deref(), Some("Alice"));
694            }
695            _ => panic!("Expected AmiEvent::Newchannel"),
696        }
697    }
698
699    #[test]
700    fn test_deserialize_hangup_event() {
701        let raw = "Event: Hangup\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nCause: 16\r\nCause-txt: Normal Clearing\r\n\r\n";
702        let parsed = parse_ami_protocol_message(raw).unwrap();
703        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
704        match event {
705            AmiEvent::Hangup(data) => {
706                assert_eq!(data.channel, "SIP/100-00000001");
707                assert_eq!(data.uniqueid, "1234");
708                assert_eq!(data.cause.as_deref(), Some("16"));
709                assert_eq!(data.cause_txt.as_deref(), Some("Normal Clearing"));
710            }
711            _ => panic!("Expected AmiEvent::Hangup"),
712        }
713    }
714
715    #[test]
716    fn test_deserialize_peerstatus_event() {
717        let raw = "Event: PeerStatus\r\nPeer: SIP/100\r\nPeerStatus: Registered\r\n\r\n";
718        let parsed = parse_ami_protocol_message(raw).unwrap();
719        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
720        match event {
721            AmiEvent::PeerStatus(data) => {
722                assert_eq!(data.peer, "SIP/100");
723                assert_eq!(data.peer_status, "Registered");
724            }
725            _ => panic!("Expected AmiEvent::PeerStatus"),
726        }
727    }
728
729    #[test]
730    fn test_deserialize_unknown_event() {
731        let raw = "Event: FooBar\r\nSomeField: Value\r\n\r\n";
732        let parsed = parse_ami_protocol_message(raw).unwrap();
733        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
734        match event {
735            AmiEvent::UnknownEvent { event_type, fields } => {
736                assert_eq!(event_type, "FooBar");
737                assert_eq!(fields.get("SomeField").map(|s| s.as_str()), Some("Value"));
738            }
739            _ => panic!("Expected AmiEvent::UnknownEvent"),
740        }
741    }
742
743    #[tokio::test]
744    async fn test_manager_options_clone() {
745        let opts = ManagerOptions {
746            port: 5038,
747            host: "localhost".to_string(),
748            username: "admin".to_string(),
749            password: "pwd".to_string(),
750            events: true,
751        };
752        let opts2 = opts.clone();
753        assert_eq!(opts.port, opts2.port);
754        assert_eq!(opts.host, opts2.host);
755        assert_eq!(opts.username, opts2.username);
756        assert_eq!(opts.password, opts2.password);
757        assert_eq!(opts.events, opts2.events);
758    }
759
760    #[tokio::test]
761    async fn test_manager_new_and_auth_flag() {
762        let opts = ManagerOptions {
763            port: 5038,
764            host: "localhost".to_string(),
765            username: "admin".to_string(),
766            password: "pwd".to_string(),
767            events: false,
768        };
769        let manager = Manager::new(opts);
770        assert!(!manager.is_authenticated().await);
771    }
772
773    #[tokio::test]
774    async fn test_event_internal_connection_lost() {
775        let opts = ManagerOptions {
776            port: 5038,
777            host: "localhost".to_string(),
778            username: "admin".to_string(),
779            password: "pwd".to_string(),
780            events: true,
781        };
782        let manager = Manager::new(opts);
783        let mut stream = manager.all_events_stream().await;
784        {
785            let inner = manager.inner.lock().await;
786            let _ = inner
787                .event_broadcaster
788                .send(AmiEvent::InternalConnectionLost {
789                    error: "simulated".to_string(),
790                });
791        }
792        let ev = stream.next().await.unwrap().unwrap();
793        match ev {
794            AmiEvent::InternalConnectionLost { error } => {
795                assert_eq!(error, "simulated");
796            }
797            _ => panic!("Expected InternalConnectionLost"),
798        }
799    }
800
801    #[tokio::test]
802    async fn test_manager_options_default() {
803        let opts = ManagerOptions {
804            port: 5038,
805            host: "localhost".to_string(),
806            username: "admin".to_string(),
807            password: "pwd".to_string(),
808            events: true,
809        };
810        assert_eq!(opts.events, true);
811    }
812}