asterisk_manager/
lib.rs

1//! # Asterisk Manager Library
2//!
3//! A modern, strongly-typed, stream-based library for integration with the Asterisk Manager Interface (AMI).
4//!
5//! - **Typed AMI messages**: Actions, Events, and Responses as Rust enums/structs.
6//! - **Stream-based API**: Consume events via `tokio_stream`.
7//! - **Asynchronous operations**: Fully based on Tokio.
8//! - **Resilient connections**: Optional resilient module with heartbeat and automatic reconnection.
9//!
10//! ## Usage Example
11//!
12//! ```rust,no_run
13//! use asterisk_manager::{Manager, ManagerOptions, AmiAction};
14//! use tokio_stream::StreamExt;
15//!
16//! #[tokio::main]
17//! async fn main() {
18//!     let options = ManagerOptions {
19//!         port: 5038,
20//!         host: "127.0.0.1".to_string(),
21//!         username: "admin".to_string(),
22//!         password: "password".to_string(),
23//!         events: true,
24//!     };
25//!     let mut manager = Manager::new();
26//!     manager.connect_and_login(options).await.unwrap();
27//!
28//!     let mut events = manager.all_events_stream().await;
29//!     tokio::spawn(async move {
30//!         while let Some(Ok(ev)) = events.next().await {
31//!             println!("Event: {:?}", ev);
32//!         }
33//!     });
34//!
35//!     let resp = manager.send_action(AmiAction::Ping { action_id: None }).await.unwrap();
36//!     println!("Ping response: {:?}", resp);
37//!     manager.disconnect().await.unwrap();
38//! }
39//! ```
40//!
41//! ## Resilient Connections
42//!
43//! For production applications that need automatic reconnection and heartbeat monitoring,
44//! use the `resilient` module:
45//!
46//! ```rust,no_run
47//! use asterisk_manager::resilient::{ResilientOptions, connect_resilient};
48//! use asterisk_manager::ManagerOptions;
49//!
50//! #[tokio::main]
51//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
52//!     let options = ResilientOptions {
53//!         manager_options: ManagerOptions {
54//!             port: 5038,
55//!             host: "127.0.0.1".to_string(),
56//!             username: "admin".to_string(),
57//!             password: "password".to_string(),
58//!             events: true,
59//!         },
60//!         buffer_size: 2048,
61//!         enable_heartbeat: true,
62//!         enable_watchdog: true,
63//!         heartbeat_interval: 30,
64//!         watchdog_interval: 1,
65//!         max_retries: 3,
66//!         metrics: None,
67//!         cumulative_attempts_counter: None,
68//!     };
69//!     
70//!     let manager = connect_resilient(options).await?;
71//!     // Manager now has heartbeat and automatic reconnection enabled
72//!     Ok(())
73//! }
74//! ```
75//!
76//! ## Features
77//!
78//! - Login/logout, sending actions, and receiving AMI events.
79//! - Support for common events (`Newchannel`, `Hangup`, `PeerStatus`) and fallback for unknown events.
80//! - Detailed error handling via the `AmiError` enum.
81//! - Configurable buffer sizes for high-throughput applications.
82//! - Heartbeat monitoring with configurable interval and automatic disconnection on failure.
83//! - Watchdog for automatic reconnection with configurable check interval when not authenticated.
84//! - Infinite event streams that handle lag and reconnection automatically.
85//!
86//! ## Requirements
87//!
88//! - Rust 1.70+
89//! - Tokio (asynchronous runtime)
90//!
91//! ## License
92//!
93//! MIT
94
95use serde::de::Deserializer;
96use serde::{Deserialize, Serialize};
97use serde_json::Value;
98use std::collections::HashMap;
99use std::sync::Arc;
100use thiserror::Error;
101use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
102use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
103use tokio::net::TcpStream;
104use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
105use tokio::time::{timeout, Duration};
106use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
107use tokio_stream::wrappers::BroadcastStream;
108use tokio_stream::Stream;
109use tokio_util::sync::CancellationToken;
110#[cfg(feature = "docs")]
111use utoipa::ToSchema;
112use uuid::Uuid;
113
114pub mod resilient;
115
116#[cfg_attr(feature = "docs", derive(ToSchema))]
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct AmiResponse {
119    #[serde(rename = "Response")]
120    pub response: String,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    #[serde(rename = "ActionID")]
123    pub action_id: Option<String>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    #[serde(rename = "Message")]
126    pub message: Option<String>,
127    #[serde(flatten)]
128    #[cfg_attr(feature = "docs", schema(additional_properties = true))]
129    pub fields: HashMap<String, Value>,
130}
131
132#[cfg_attr(feature = "docs", derive(ToSchema))]
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(tag = "Action", rename_all = "PascalCase")]
135pub enum AmiAction {
136    Login {
137        username: String,
138        secret: String,
139        #[serde(rename = "Events")]
140        events: Option<String>,
141        #[serde(rename = "ActionID")]
142        action_id: Option<String>,
143    },
144    Logoff {
145        #[serde(rename = "ActionID")]
146        action_id: Option<String>,
147    },
148    Ping {
149        #[serde(rename = "ActionID")]
150        action_id: Option<String>,
151    },
152    Command {
153        command: String,
154        #[serde(rename = "ActionID")]
155        action_id: Option<String>,
156    },
157    Custom {
158        action: String,
159        #[serde(flatten)]
160        params: HashMap<String, String>,
161        #[serde(rename = "ActionID")]
162        action_id: Option<String>,
163    },
164}
165
166#[cfg_attr(feature = "docs", derive(ToSchema))]
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct NewchannelEventData {
169    #[serde(rename = "Channel")]
170    pub channel: String,
171    #[serde(rename = "Uniqueid")]
172    pub uniqueid: String,
173    #[serde(rename = "ChannelState")]
174    pub channel_state: Option<String>,
175    #[serde(rename = "ChannelStateDesc")]
176    pub channel_state_desc: Option<String>,
177    #[serde(rename = "CallerIDNum")]
178    pub caller_id_num: Option<String>,
179    #[serde(rename = "CallerIDName")]
180    pub caller_id_name: Option<String>,
181    #[serde(flatten)]
182    pub other: HashMap<String, String>,
183}
184
185#[cfg_attr(feature = "docs", derive(ToSchema))]
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct HangupEventData {
188    #[serde(rename = "Channel")]
189    pub channel: String,
190    #[serde(rename = "Uniqueid")]
191    pub uniqueid: String,
192    #[serde(rename = "Cause")]
193    pub cause: Option<String>,
194    #[serde(rename = "Cause-txt")]
195    pub cause_txt: Option<String>,
196    #[serde(flatten)]
197    pub other: HashMap<String, String>,
198}
199
200#[cfg_attr(feature = "docs", derive(ToSchema))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PeerStatusEventData {
203    #[serde(rename = "Peer")]
204    pub peer: String,
205    #[serde(rename = "PeerStatus")]
206    pub peer_status: String,
207    #[serde(flatten)]
208    pub other: HashMap<String, String>,
209}
210
211#[cfg_attr(feature = "docs", derive(ToSchema))]
212#[derive(Debug, Clone, Serialize)]
213#[serde(untagged)]
214pub enum AmiEvent {
215    Newchannel(NewchannelEventData),
216    Hangup(HangupEventData),
217    PeerStatus(PeerStatusEventData),
218    UnknownEvent {
219        event_type: String,
220        fields: HashMap<String, String>,
221    },
222    InternalConnectionLost {
223        error: String,
224    },
225}
226
227impl<'de> Deserialize<'de> for AmiEvent {
228    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
229    where
230        D: Deserializer<'de>,
231    {
232        let value = serde_json::Value::deserialize(deserializer)?;
233        let map_obj = value
234            .as_object()
235            .ok_or_else(|| serde::de::Error::custom("AmiEvent: Expected a JSON object/map"))?;
236
237        if let Some(event_type_val) = map_obj.get("Event") {
238            let event_type_str = event_type_val.as_str().ok_or_else(|| {
239                serde::de::Error::custom("AmiEvent: 'Event' field is not a string")
240            })?;
241
242            match event_type_str {
243                "Newchannel" => Ok(AmiEvent::Newchannel(
244                    NewchannelEventData::deserialize(value.clone())
245                        .map_err(serde::de::Error::custom)?,
246                )),
247                "Hangup" => Ok(AmiEvent::Hangup(
248                    HangupEventData::deserialize(value.clone())
249                        .map_err(serde::de::Error::custom)?,
250                )),
251                "PeerStatus" => Ok(AmiEvent::PeerStatus(
252                    PeerStatusEventData::deserialize(value.clone())
253                        .map_err(serde::de::Error::custom)?,
254                )),
255                _ => {
256                    let fields: HashMap<String, String> = map_obj
257                        .iter()
258                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
259                        .collect();
260                    Ok(AmiEvent::UnknownEvent {
261                        event_type: event_type_str.to_string(),
262                        fields,
263                    })
264                }
265            }
266        } else {
267            let fields: HashMap<String, String> = map_obj
268                .iter()
269                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
270                .collect();
271            Ok(AmiEvent::UnknownEvent {
272                event_type: "UnknownOrMalformed".to_string(),
273                fields,
274            })
275        }
276    }
277}
278
279#[derive(Debug, Error)]
280pub enum AmiError {
281    #[error("IO error: {0}")]
282    Io(#[from] std::io::Error),
283    #[error("Parse error: {0}")]
284    ParseError(String),
285    #[error("Serialize error: {0}")]
286    SerializeError(String),
287    #[error("JSON error: {0}")]
288    SerdeJson(#[from] serde_json::Error),
289    #[error("Authentication failed: {0}")]
290    AuthenticationFailed(String),
291    #[error("Action failed: {response:?}")]
292    ActionFailed { response: AmiResponse },
293    #[error("Connection closed")]
294    ConnectionClosed,
295    #[error("Operation timed out")]
296    Timeout,
297    #[error("Login required")]
298    LoginRequired,
299    #[error("Internal channel error: {0}")]
300    ChannelError(String),
301    #[error("Event stream lagged: {0}")]
302    EventStreamLagged(#[from] tokio::sync::broadcast::error::RecvError),
303    #[error("Not connected to AMI server")]
304    NotConnected,
305    #[error("Other error: {0}")]
306    Other(String),
307}
308
309#[derive(Serialize, Deserialize, Debug, Clone)]
310pub struct ManagerOptions {
311    pub port: u16,
312    pub host: String,
313    pub username: String,
314    pub password: String,
315    pub events: bool,
316}
317
318struct InnerManager {
319    authenticated: bool,
320    /// Channel for sending raw AMI messages
321    write_tx: Option<mpsc::Sender<String>>,
322    /// Channel for receiving raw AMI messages
323    event_broadcaster: broadcast::Sender<AmiEvent>,
324    /// Responders mapped for each action ID
325    pending_responses: HashMap<String, oneshot::Sender<Result<AmiResponse, AmiError>>>,
326    /// Heartbeat cancellation token
327    heartbeat_token: Option<CancellationToken>,
328    /// Watchdog cancellation token
329    watchdog_token: Option<CancellationToken>,
330}
331
332#[derive(Clone)]
333pub struct Manager {
334    pub(crate) inner: Arc<Mutex<InnerManager>>,
335}
336
337impl Default for Manager {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343impl Manager {
344    pub fn new() -> Self {
345        Self::new_with_buffer(1024)
346    }
347
348    pub fn new_with_buffer(buffer_size: usize) -> Self {
349        let (event_tx, _) = broadcast::channel(buffer_size);
350        let inner = InnerManager {
351            authenticated: false,
352            write_tx: None,
353            event_broadcaster: event_tx,
354            pending_responses: HashMap::new(),
355            heartbeat_token: None,
356            watchdog_token: None,
357        };
358        Self {
359            inner: Arc::new(Mutex::new(inner)),
360        }
361    }
362
363    pub async fn connect_and_login(&mut self, options: ManagerOptions) -> Result<(), AmiError> {
364        let stream = timeout(
365            Duration::from_secs(10),
366            TcpStream::connect((options.host.as_str(), options.port)),
367        )
368        .await
369        .map_err(|_| AmiError::Timeout)?
370        .map_err(AmiError::Io)?;
371
372        let (reader, writer) = stream.into_split();
373
374        let (write_tx, write_rx) = mpsc::channel::<String>(100);
375        let (dispatch_tx, dispatch_rx) = mpsc::channel::<String>(1024);
376
377        let event_broadcaster = {
378            let inner = self.inner.lock().await;
379            inner.event_broadcaster.clone()
380        };
381
382        spawn_writer_task(writer, write_rx);
383        spawn_reader_task(reader, dispatch_tx, event_broadcaster);
384        spawn_dispatcher_task(self.inner.clone(), dispatch_rx);
385
386        self.inner.lock().await.write_tx = Some(write_tx);
387
388        let login_action = AmiAction::Login {
389            username: options.username.clone(),
390            secret: options.password.clone(),
391            events: Some("on".to_string()),
392            action_id: Some("rust-ami-login".to_string()),
393        };
394
395        match self.send_action(login_action).await {
396            Ok(resp) if resp.response.eq_ignore_ascii_case("Success") => {
397                self.inner.lock().await.authenticated = true;
398                Ok(())
399            }
400            Ok(resp) => Err(AmiError::AuthenticationFailed(
401                resp.message.unwrap_or_default(),
402            )),
403            Err(e) => Err(e),
404        }
405    }
406
407    pub async fn send_action(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
408        let action_id = get_or_set_action_id(&mut action);
409
410        let mut stream = self.all_events_stream().await;
411
412        let initial_response = self.send_initial_request(action.clone()).await?;
413
414        if initial_response
415            .fields
416            .get("EventList")
417            .and_then(|v| v.as_str())
418            == Some("start")
419        {
420            let mut collected_events = Vec::new();
421
422            let collection_result = tokio::time::timeout(Duration::from_secs(10), async {
423                use tokio_stream::StreamExt;
424                while let Some(Ok(event)) = stream.next().await {
425                    if let AmiEvent::UnknownEvent { event_type, fields } = &event {
426                        if fields.get("ActionID").map(|id| id.as_str()) == Some(&action_id) {
427                            if event_type.ends_with("Complete") {
428                                break;
429                            }
430                            collected_events.push(event.clone());
431                        }
432                    }
433                }
434            })
435            .await;
436
437            if collection_result.is_err() {
438                return Err(AmiError::Timeout);
439            }
440
441            let mut final_fields = initial_response.fields;
442            final_fields.insert(
443                "CollectedEvents".to_string(),
444                serde_json::to_value(&collected_events)?,
445            );
446
447            Ok(AmiResponse {
448                response: initial_response.response,
449                action_id: initial_response.action_id,
450                message: Some("Successfully collected events.".to_string()),
451                fields: final_fields,
452            })
453        } else {
454            Ok(initial_response)
455        }
456    }
457
458    async fn send_initial_request(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
459        let action_id = get_or_set_action_id(&mut action);
460        let (tx, rx) = oneshot::channel();
461        let action_str = serialize_ami_action(&action)?;
462
463        {
464            let mut inner = self.inner.lock().await;
465            if inner.write_tx.is_none() {
466                return Err(AmiError::NotConnected);
467            }
468            inner.pending_responses.insert(action_id.clone(), tx);
469            let writer = inner.write_tx.as_ref().unwrap();
470            if writer.send(action_str).await.is_err() {
471                inner.pending_responses.remove(&action_id);
472                return Err(AmiError::ConnectionClosed);
473            }
474        }
475
476        match timeout(Duration::from_secs(10), rx).await {
477            Ok(Ok(Ok(resp))) => Ok(resp),
478            Ok(Ok(Err(e))) => Err(e),
479            Ok(Err(_)) => Err(AmiError::ChannelError("Responder dropped".to_string())),
480            Err(_) => Err(AmiError::Timeout),
481        }
482    }
483
484    pub async fn disconnect(&self) -> Result<(), AmiError> {
485        let mut inner = self.inner.lock().await;
486        inner.write_tx = None;
487        inner.authenticated = false;
488
489        // Cancel heartbeat and watchdog
490        if let Some(token) = &inner.heartbeat_token {
491            token.cancel();
492            inner.heartbeat_token = None;
493        }
494        if let Some(token) = &inner.watchdog_token {
495            token.cancel();
496            inner.watchdog_token = None;
497        }
498
499        Ok(())
500    }
501
502    pub async fn is_authenticated(&self) -> bool {
503        self.inner.lock().await.authenticated
504    }
505
506    pub async fn all_events_stream(
507        &self,
508    ) -> impl Stream<Item = Result<AmiEvent, BroadcastStreamRecvError>> + Send + Unpin {
509        let inner = self.inner.lock().await;
510        BroadcastStream::new(inner.event_broadcaster.subscribe())
511    }
512
513    /// Start heartbeat with default interval (30 seconds). Kept for backwards compatibility.
514    pub async fn start_heartbeat(&self) -> Result<(), AmiError> {
515        self.start_heartbeat_with_interval(30).await
516    }
517
518    /// Start the heartbeat task with a configurable interval (in seconds).
519    pub async fn start_heartbeat_with_interval(&self, interval_secs: u64) -> Result<(), AmiError> {
520        let mut inner = self.inner.lock().await;
521
522        // Cancel existing heartbeat if any
523        if let Some(token) = &inner.heartbeat_token {
524            token.cancel();
525        }
526
527        let token = CancellationToken::new();
528        inner.heartbeat_token = Some(token.clone());
529
530        let manager = self.clone();
531        tokio::spawn(async move {
532            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
533            loop {
534                tokio::select! {
535                    _ = token.cancelled() => {
536                        break;
537                    }
538                    _ = interval.tick() => {
539                        if manager.is_authenticated().await {
540                            match manager.send_action(AmiAction::Ping { action_id: None }).await {
541                                Ok(_) => {
542                                    log::debug!("Heartbeat ping successful");
543                                }
544                                Err(e) => {
545                                    log::warn!("Heartbeat ping failed: {}", e);
546                                    // Emit connection lost event
547                                    if let Ok(inner) = manager.inner.try_lock() {
548                                        let _ = inner.event_broadcaster.send(AmiEvent::InternalConnectionLost {
549                                            error: format!("Heartbeat failed: {}", e),
550                                        });
551                                    }
552                                    // Disconnect on heartbeat failure
553                                    let _ = manager.disconnect().await;
554                                    break;
555                                }
556                            }
557                        }
558                    }
559                }
560            }
561        });
562
563        Ok(())
564    }
565
566    pub async fn start_watchdog(&self, options: ManagerOptions) -> Result<(), AmiError> {
567        self.start_watchdog_with_interval(options, 1).await
568    }
569
570    pub async fn start_watchdog_with_interval(
571        &self,
572        options: ManagerOptions,
573        interval_secs: u64,
574    ) -> Result<(), AmiError> {
575        let mut inner = self.inner.lock().await;
576
577        // Cancel existing watchdog if any
578        if let Some(token) = &inner.watchdog_token {
579            token.cancel();
580        }
581
582        let token = CancellationToken::new();
583        inner.watchdog_token = Some(token.clone());
584
585        let manager = self.clone();
586        tokio::spawn(async move {
587            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
588            loop {
589                tokio::select! {
590                    _ = token.cancelled() => {
591                        break;
592                    }
593                    _ = interval.tick() => {
594                        if !manager.is_authenticated().await {
595                            log::debug!("Watchdog attempting reconnection...");
596                            let mut mgr = manager.clone();
597                            match mgr.connect_and_login(options.clone()).await {
598                                Ok(_) => {
599                                    log::info!("Watchdog reconnection successful");
600                                }
601                                Err(e) => {
602                                    log::debug!("Watchdog reconnection failed: {}", e);
603                                }
604                            }
605                        }
606                    }
607                }
608            }
609        });
610
611        Ok(())
612    }
613}
614
615fn spawn_writer_task(mut writer: OwnedWriteHalf, mut write_rx: mpsc::Receiver<String>) {
616    tokio::spawn(async move {
617        while let Some(action_str) = write_rx.recv().await {
618            if writer.write_all(action_str.as_bytes()).await.is_err() {
619                break;
620            }
621        }
622    });
623}
624
625fn spawn_reader_task(
626    reader: OwnedReadHalf,
627    dispatch_tx: mpsc::Sender<String>,
628    event_broadcaster: broadcast::Sender<AmiEvent>,
629) {
630    tokio::spawn(async move {
631        let mut buf_reader = BufReader::new(reader);
632        loop {
633            let mut message_block = String::new();
634            loop {
635                let mut line = String::new();
636                match buf_reader.read_line(&mut line).await {
637                    Ok(0) | Err(_) => {
638                        // Connection lost - emit synthetic event
639                        let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
640                            error: "Connection lost during read".to_string(),
641                        });
642                        return;
643                    }
644                    Ok(_) => {
645                        let is_end = line == "\r\n";
646                        message_block.push_str(&line);
647                        if is_end {
648                            break;
649                        }
650                    }
651                }
652            }
653
654            if !message_block.trim().is_empty() && dispatch_tx.send(message_block).await.is_err() {
655                let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
656                    error: "Dispatcher channel closed".to_string(),
657                });
658                break;
659            }
660        }
661    });
662}
663
664fn spawn_dispatcher_task(
665    inner_arc: Arc<Mutex<InnerManager>>,
666    mut dispatch_rx: mpsc::Receiver<String>,
667) {
668    tokio::spawn(async move {
669        while let Some(raw_message) = dispatch_rx.recv().await {
670            if let Ok(parsed_messages) = parse_ami_protocol_message(&raw_message) {
671                for value_msg in parsed_messages {
672                    let mut inner = inner_arc.lock().await;
673                    if value_msg.get("Response").is_some() {
674                        if let Ok(resp) = serde_json::from_value::<AmiResponse>(value_msg) {
675                            if let Some(action_id) = &resp.action_id {
676                                if let Some(responder) = inner.pending_responses.remove(action_id) {
677                                    let _ = responder.send(Ok(resp));
678                                }
679                            }
680                        }
681                    } else if value_msg.get("Event").is_some() {
682                        if let Ok(event) = serde_json::from_value::<AmiEvent>(value_msg.clone()) {
683                            let _ = inner.event_broadcaster.send(event);
684                        }
685                    }
686                }
687            }
688        }
689    });
690}
691
692fn parse_ami_protocol_message(raw_data: &str) -> Result<Vec<serde_json::Value>, AmiError> {
693    let mut messages = Vec::new();
694    for block in raw_data.trim().split("\r\n\r\n") {
695        if block.is_empty() {
696            continue;
697        }
698        let mut map = serde_json::Map::new();
699        for line in block.lines() {
700            if let Some((key, value)) = line.split_once(": ") {
701                map.insert(
702                    key.trim().to_string(),
703                    serde_json::Value::String(value.trim().to_string()),
704                );
705            }
706        }
707        if !map.is_empty() {
708            messages.push(serde_json::Value::Object(map));
709        }
710    }
711    Ok(messages)
712}
713
714fn serialize_ami_action(action: &AmiAction) -> Result<String, AmiError> {
715    let mut s = String::new();
716    match action {
717        AmiAction::Login {
718            username,
719            secret,
720            events,
721            action_id,
722        } => {
723            s.push_str("Action: Login\r\n");
724            s.push_str(&format!("Username: {}\r\n", username));
725            s.push_str(&format!("Secret: {}\r\n", secret));
726            if let Some(ev) = events {
727                s.push_str(&format!("Events: {}\r\n", ev));
728            }
729            if let Some(id) = action_id {
730                s.push_str(&format!("ActionID: {}\r\n", id));
731            }
732        }
733        AmiAction::Logoff { action_id } => {
734            s.push_str("Action: Logoff\r\n");
735            if let Some(id) = action_id {
736                s.push_str(&format!("ActionID: {}\r\n", id));
737            }
738        }
739        AmiAction::Ping { action_id } => {
740            s.push_str("Action: Ping\r\n");
741            if let Some(id) = action_id {
742                s.push_str(&format!("ActionID: {}\r\n", id));
743            }
744        }
745        AmiAction::Command { command, action_id } => {
746            s.push_str("Action: Command\r\n");
747            s.push_str(&format!("Command: {}\r\n", command));
748            if let Some(id) = action_id {
749                s.push_str(&format!("ActionID: {}\r\n", id));
750            }
751        }
752        AmiAction::Custom {
753            action: action_name,
754            params,
755            action_id,
756        } => {
757            s.push_str(&format!("Action: {}\r\n", action_name));
758            for (k, v) in params {
759                s.push_str(&format!("{}: {}\r\n", k, v));
760            }
761            if let Some(id) = action_id {
762                s.push_str(&format!("ActionID: {}\r\n", id));
763            }
764        }
765    }
766    s.push_str("\r\n");
767    Ok(s)
768}
769
770fn get_or_set_action_id(action: &mut AmiAction) -> String {
771    match action {
772        AmiAction::Login { action_id, .. }
773        | AmiAction::Logoff { action_id }
774        | AmiAction::Ping { action_id }
775        | AmiAction::Command { action_id, .. }
776        | AmiAction::Custom { action_id, .. } => {
777            if let Some(id) = action_id {
778                id.clone()
779            } else {
780                let new_id = Uuid::new_v4().to_string();
781                *action_id = Some(new_id.clone());
782                new_id
783            }
784        }
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791    use tokio_stream::StreamExt;
792
793    #[test]
794    fn test_serialize_login_action() {
795        let action = AmiAction::Login {
796            username: "user".to_string(),
797            secret: "pass".to_string(),
798            events: Some("on".to_string()),
799            action_id: Some("abc123".to_string()),
800        };
801        let s = serialize_ami_action(&action).unwrap();
802        assert!(s.contains("Action: Login"));
803        assert!(s.contains("Username: user"));
804        assert!(s.contains("Secret: pass"));
805        assert!(s.contains("Events: on"));
806        assert!(s.contains("ActionID: abc123"));
807        assert!(s.ends_with("\r\n\r\n"));
808    }
809
810    #[test]
811    fn test_serialize_command_action() {
812        let action = AmiAction::Command {
813            command: "sip show peers".to_string(),
814            action_id: None,
815        };
816        let s = serialize_ami_action(&action).unwrap();
817        assert!(s.contains("Action: Command"));
818        assert!(s.contains("Command: sip show peers"));
819    }
820
821    #[test]
822    fn test_parse_ami_protocol_message() {
823        let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
824        let parsed = parse_ami_protocol_message(raw).unwrap();
825        assert_eq!(parsed.len(), 1);
826        let obj = &parsed[0];
827        assert_eq!(obj["Response"], "Success");
828        assert_eq!(obj["ActionID"], "123");
829        assert_eq!(obj["Message"], "Authentication accepted");
830    }
831
832    #[test]
833    fn test_deserialize_ami_response() {
834        let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
835        let parsed = parse_ami_protocol_message(raw).unwrap();
836        let resp: AmiResponse = serde_json::from_value(parsed[0].clone()).unwrap();
837        assert_eq!(resp.response, "Success");
838        assert_eq!(resp.action_id.as_deref(), Some("123"));
839        assert_eq!(resp.message.as_deref(), Some("Authentication accepted"));
840    }
841
842    #[test]
843    fn test_deserialize_newchannel_event() {
844        let raw = "Event: Newchannel\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nChannelState: 4\r\nChannelStateDesc: Ring\r\nCallerIDNum: 100\r\nCallerIDName: Alice\r\n\r\n";
845        let parsed = parse_ami_protocol_message(raw).unwrap();
846        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
847        match event {
848            AmiEvent::Newchannel(data) => {
849                assert_eq!(data.channel, "SIP/100-00000001");
850                assert_eq!(data.uniqueid, "1234");
851                assert_eq!(data.channel_state.as_deref(), Some("4"));
852                assert_eq!(data.channel_state_desc.as_deref(), Some("Ring"));
853                assert_eq!(data.caller_id_num.as_deref(), Some("100"));
854                assert_eq!(data.caller_id_name.as_deref(), Some("Alice"));
855            }
856            _ => panic!("Expected AmiEvent::Newchannel"),
857        }
858    }
859
860    #[test]
861    fn test_deserialize_hangup_event() {
862        let raw = "Event: Hangup\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nCause: 16\r\nCause-txt: Normal Clearing\r\n\r\n";
863        let parsed = parse_ami_protocol_message(raw).unwrap();
864        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
865        match event {
866            AmiEvent::Hangup(data) => {
867                assert_eq!(data.channel, "SIP/100-00000001");
868                assert_eq!(data.uniqueid, "1234");
869                assert_eq!(data.cause.as_deref(), Some("16"));
870                assert_eq!(data.cause_txt.as_deref(), Some("Normal Clearing"));
871            }
872            _ => panic!("Expected AmiEvent::Hangup"),
873        }
874    }
875
876    #[test]
877    fn test_deserialize_peerstatus_event() {
878        let raw = "Event: PeerStatus\r\nPeer: SIP/100\r\nPeerStatus: Registered\r\n\r\n";
879        let parsed = parse_ami_protocol_message(raw).unwrap();
880        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
881        match event {
882            AmiEvent::PeerStatus(data) => {
883                assert_eq!(data.peer, "SIP/100");
884                assert_eq!(data.peer_status, "Registered");
885            }
886            _ => panic!("Expected AmiEvent::PeerStatus"),
887        }
888    }
889
890    #[test]
891    fn test_deserialize_unknown_event() {
892        let raw = "Event: FooBar\r\nSomeField: Value\r\n\r\n";
893        let parsed = parse_ami_protocol_message(raw).unwrap();
894        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
895        match event {
896            AmiEvent::UnknownEvent { event_type, fields } => {
897                assert_eq!(event_type, "FooBar");
898                assert_eq!(fields.get("SomeField").map(|s| s.as_str()), Some("Value"));
899            }
900            _ => panic!("Expected AmiEvent::UnknownEvent"),
901        }
902    }
903
904    #[tokio::test]
905    async fn test_manager_options_clone() {
906        let opts = ManagerOptions {
907            port: 5038,
908            host: "localhost".to_string(),
909            username: "admin".to_string(),
910            password: "pwd".to_string(),
911            events: true,
912        };
913        let opts2 = opts.clone();
914        assert_eq!(opts.port, opts2.port);
915        assert_eq!(opts.host, opts2.host);
916        assert_eq!(opts.username, opts2.username);
917        assert_eq!(opts.password, opts2.password);
918        assert_eq!(opts.events, opts2.events);
919    }
920
921    #[tokio::test]
922    async fn test_manager_new_and_auth_flag() {
923        // A criação de `opts` não é mais necessária para este teste.
924        let manager = Manager::new(); // Manager::new() agora não tem argumentos.
925        assert!(!manager.is_authenticated().await);
926    }
927
928    #[tokio::test]
929    async fn test_event_internal_connection_lost() {
930        // 1. Cria um manager vazio, como no teste anterior.
931        let manager = Manager::new();
932
933        // 2. Get the event stream BEFORE sending the event.
934        let mut stream = manager.all_events_stream().await;
935
936        // 3. Send the event internally to simulate a disconnection.
937        //    This part works again because of `pub(crate)`.
938        {
939            let inner = manager.inner.lock().await;
940            let _ = inner
941                .event_broadcaster
942                .send(AmiEvent::InternalConnectionLost {
943                    error: "simulated".to_string(),
944                });
945        }
946
947        // 4. Verifica se o evento foi recebido corretamente pelo stream.
948        let ev = stream.next().await.unwrap().unwrap();
949        match ev {
950            AmiEvent::InternalConnectionLost { error } => {
951                assert_eq!(error, "simulated");
952            }
953            _ => panic!("Expected InternalConnectionLost"),
954        }
955    }
956
957    #[tokio::test]
958    async fn test_manager_options_default() {
959        let opts = ManagerOptions {
960            port: 5038,
961            host: "localhost".to_string(),
962            username: "admin".to_string(),
963            password: "pwd".to_string(),
964            events: true,
965        };
966        assert_eq!(opts.events, true);
967    }
968
969    #[tokio::test]
970    async fn test_manager_new_with_buffer() {
971        let manager = Manager::new_with_buffer(512);
972        assert!(!manager.is_authenticated().await);
973
974        // Test that the buffer size is respected by checking we can create the stream
975        let _stream = manager.all_events_stream().await;
976    }
977
978    #[tokio::test]
979    async fn test_heartbeat_and_watchdog_tokens() {
980        let manager = Manager::new();
981
982        // Initially no tokens should be set
983        {
984            let inner = manager.inner.lock().await;
985            assert!(inner.heartbeat_token.is_none());
986            assert!(inner.watchdog_token.is_none());
987        }
988
989        // Create dummy options for testing
990        let opts = ManagerOptions {
991            port: 5038,
992            host: "127.0.0.1".to_string(),
993            username: "test".to_string(),
994            password: "test".to_string(),
995            events: true,
996        };
997
998        // Start heartbeat should set token (even though connection will fail)
999        let _ = manager.start_heartbeat().await;
1000        {
1001            let inner = manager.inner.lock().await;
1002            assert!(inner.heartbeat_token.is_some());
1003        }
1004
1005        // Start watchdog should set token
1006        let _ = manager.start_watchdog(opts).await;
1007        {
1008            let inner = manager.inner.lock().await;
1009            assert!(inner.watchdog_token.is_some());
1010        }
1011
1012        // Disconnect should clear both tokens
1013        let _ = manager.disconnect().await;
1014        {
1015            let inner = manager.inner.lock().await;
1016            assert!(inner.heartbeat_token.is_none());
1017            assert!(inner.watchdog_token.is_none());
1018        }
1019    }
1020
1021    #[tokio::test]
1022    async fn test_connection_lost_event_emission() {
1023        // Test that synthetic connection lost events are properly emitted
1024        let manager = Manager::new();
1025        let mut stream = manager.all_events_stream().await;
1026
1027        // Manually emit a connection lost event
1028        {
1029            let inner = manager.inner.lock().await;
1030            let _ = inner
1031                .event_broadcaster
1032                .send(AmiEvent::InternalConnectionLost {
1033                    error: "test connection lost".to_string(),
1034                });
1035        }
1036
1037        // Verify the event is received
1038        let event = stream.next().await.unwrap().unwrap();
1039        match event {
1040            AmiEvent::InternalConnectionLost { error } => {
1041                assert_eq!(error, "test connection lost");
1042            }
1043            _ => panic!("Expected InternalConnectionLost event"),
1044        }
1045    }
1046
1047    #[tokio::test]
1048    async fn test_heartbeat_interval_respected() {
1049        // Use tokio time control to test heartbeat scheduling
1050        tokio::time::pause();
1051
1052        let manager = Manager::new();
1053
1054        // Start heartbeat with a short interval
1055        let _ = manager.start_heartbeat_with_interval(2).await;
1056
1057        // Advance time less than interval: no ticks yet
1058        tokio::time::advance(Duration::from_secs(1)).await;
1059        {
1060            let inner = manager.inner.lock().await;
1061            // Token should be set
1062            assert!(inner.heartbeat_token.is_some());
1063        }
1064
1065        // Advance time to trigger at least one tick
1066        tokio::time::advance(Duration::from_secs(2)).await;
1067
1068        // There's no direct public hook for verifying pings were sent without mocking
1069        // but we can assert that the heartbeat task remains active and didn't panic.
1070        // Ensure token still exists
1071        {
1072            let inner = manager.inner.lock().await;
1073            assert!(inner.heartbeat_token.is_some());
1074        }
1075
1076        // Clean up
1077        let _ = manager.disconnect().await;
1078    }
1079
1080    #[tokio::test]
1081    async fn test_watchdog_interval_configuration() {
1082        // Test that watchdog can be started with different intervals
1083        let manager = Manager::new();
1084
1085        let opts = ManagerOptions {
1086            port: 5038,
1087            host: "127.0.0.1".to_string(),
1088            username: "test".to_string(),
1089            password: "test".to_string(),
1090            events: true,
1091        };
1092
1093        // Test default interval (backward compatibility)
1094        let _ = manager.start_watchdog(opts.clone()).await;
1095        {
1096            let inner = manager.inner.lock().await;
1097            assert!(inner.watchdog_token.is_some());
1098        }
1099
1100        // Test custom interval
1101        let _ = manager.start_watchdog_with_interval(opts.clone(), 5).await;
1102        {
1103            let inner = manager.inner.lock().await;
1104            assert!(inner.watchdog_token.is_some());
1105        }
1106
1107        // Clean up
1108        let _ = manager.disconnect().await;
1109    }
1110}