asterisk_manager/
lib.rs

1//! # Asterisk Manager Library
2//!
3//! A modern, strongly-typed, stream-based library for integration with the Asterisk Manager Interface (AMI).
4//!
5//! - **Typed AMI messages**: Actions, Events, and Responses as Rust enums/structs.
6//! - **Stream-based API**: Consume events via `tokio_stream`.
7//! - **Asynchronous operations**: Fully based on Tokio.
8//! - **Resilient connections**: Optional resilient module with heartbeat and automatic reconnection.
9//!
10//! ## Usage Example
11//!
12//! ```rust,no_run
13//! use asterisk_manager::{Manager, ManagerOptions, AmiAction};
14//! use tokio_stream::StreamExt;
15//!
16//! #[tokio::main]
17//! async fn main() {
18//!     let options = ManagerOptions {
19//!         port: 5038,
20//!         host: "127.0.0.1".to_string(),
21//!         username: "admin".to_string(),
22//!         password: "password".to_string(),
23//!         events: true,
24//!     };
25//!     let mut manager = Manager::new();
26//!     manager.connect_and_login(options).await.unwrap();
27//!
28//!     let mut events = manager.all_events_stream().await;
29//!     tokio::spawn(async move {
30//!         while let Some(Ok(ev)) = events.next().await {
31//!             println!("Event: {:?}", ev);
32//!         }
33//!     });
34//!
35//!     let resp = manager.send_action(AmiAction::Ping { action_id: None }).await.unwrap();
36//!     println!("Ping response: {:?}", resp);
37//!     manager.disconnect().await.unwrap();
38//! }
39//! ```
40//!
41//! ## Resilient Connections
42//!
43//! For production applications that need automatic reconnection and heartbeat monitoring,
44//! use the `resilient` module:
45//!
46//! ```rust,no_run
47//! use asterisk_manager::resilient::{ResilientOptions, connect_resilient};
48//! use asterisk_manager::ManagerOptions;
49//!
50//! #[tokio::main]
51//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
52//!     let options = ResilientOptions {
53//!         manager_options: ManagerOptions {
54//!             port: 5038,
55//!             host: "127.0.0.1".to_string(),
56//!             username: "admin".to_string(),
57//!             password: "password".to_string(),
58//!             events: true,
59//!         },
60//!         buffer_size: 2048,
61//!         enable_heartbeat: true,
62//!         enable_watchdog: true,
63//!         heartbeat_interval: 30,
64//!         watchdog_interval: 1,
65//!         max_retries: 3,
66//!         metrics: None,
67//!         cumulative_attempts_counter: None,
68//!     };
69//!     
70//!     let manager = connect_resilient(options).await?;
71//!     // Manager now has heartbeat and automatic reconnection enabled
72//!     Ok(())
73//! }
74//! ```
75//!
76//! ## Features
77//!
78//! - Login/logout, sending actions, and receiving AMI events.
79//! - Support for common events (`Newchannel`, `Hangup`, `PeerStatus`) and fallback for unknown events.
80//! - Detailed error handling via the `AmiError` enum.
81//! - Configurable buffer sizes for high-throughput applications.
82//! - Heartbeat monitoring with configurable interval and automatic disconnection on failure.
83//! - Watchdog for automatic reconnection with configurable check interval when not authenticated.
84//! - Infinite event streams that handle lag and reconnection automatically.
85//!
86//! ## Requirements
87//!
88//! - Rust 1.70+
89//! - Tokio (asynchronous runtime)
90//!
91//! ## License
92//!
93//! MIT
94
95use serde::de::Deserializer;
96use serde::{Deserialize, Serialize};
97use serde_json::Value;
98use std::collections::HashMap;
99use std::sync::Arc;
100use thiserror::Error;
101use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
102use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
103use tokio::net::TcpStream;
104use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
105use tokio::time::{timeout, Duration};
106use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
107use tokio_stream::wrappers::BroadcastStream;
108use tokio_stream::Stream;
109use tokio_util::sync::CancellationToken;
110#[cfg(feature = "docs")]
111use utoipa::ToSchema;
112use uuid::Uuid;
113
114pub mod resilient;
115
116#[cfg_attr(feature = "docs", derive(ToSchema))]
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct AmiResponse {
119    #[serde(rename = "Response")]
120    pub response: String,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    #[serde(rename = "ActionID")]
123    pub action_id: Option<String>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    #[serde(rename = "Message")]
126    pub message: Option<String>,
127    #[serde(flatten)]
128    #[cfg_attr(feature = "docs", schema(additional_properties = true))]
129    pub fields: HashMap<String, Value>,
130}
131
132#[cfg_attr(feature = "docs", derive(ToSchema))]
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(tag = "Action", rename_all = "PascalCase")]
135pub enum AmiAction {
136    Login {
137        username: String,
138        secret: String,
139        #[serde(rename = "Events")]
140        events: Option<String>,
141        #[serde(rename = "ActionID")]
142        action_id: Option<String>,
143    },
144    Logoff {
145        #[serde(rename = "ActionID")]
146        action_id: Option<String>,
147    },
148    Ping {
149        #[serde(rename = "ActionID")]
150        action_id: Option<String>,
151    },
152    Command {
153        command: String,
154        #[serde(rename = "ActionID")]
155        action_id: Option<String>,
156    },
157    Custom {
158        action: String,
159        #[serde(flatten)]
160        params: HashMap<String, String>,
161        #[serde(rename = "ActionID")]
162        action_id: Option<String>,
163    },
164}
165
166#[cfg_attr(feature = "docs", derive(ToSchema))]
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct NewchannelEventData {
169    #[serde(rename = "Channel")]
170    pub channel: String,
171    #[serde(rename = "Uniqueid")]
172    pub uniqueid: String,
173    #[serde(rename = "ChannelState")]
174    pub channel_state: Option<String>,
175    #[serde(rename = "ChannelStateDesc")]
176    pub channel_state_desc: Option<String>,
177    #[serde(rename = "CallerIDNum")]
178    pub caller_id_num: Option<String>,
179    #[serde(rename = "CallerIDName")]
180    pub caller_id_name: Option<String>,
181    #[serde(flatten)]
182    pub other: HashMap<String, String>,
183}
184
185#[cfg_attr(feature = "docs", derive(ToSchema))]
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct HangupEventData {
188    #[serde(rename = "Channel")]
189    pub channel: String,
190    #[serde(rename = "Uniqueid")]
191    pub uniqueid: String,
192    #[serde(rename = "Cause")]
193    pub cause: Option<String>,
194    #[serde(rename = "Cause-txt")]
195    pub cause_txt: Option<String>,
196    #[serde(flatten)]
197    pub other: HashMap<String, String>,
198}
199
200#[cfg_attr(feature = "docs", derive(ToSchema))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PeerStatusEventData {
203    #[serde(rename = "Peer")]
204    pub peer: String,
205    #[serde(rename = "PeerStatus")]
206    pub peer_status: String,
207    #[serde(flatten)]
208    pub other: HashMap<String, String>,
209}
210
211#[cfg_attr(feature = "docs", derive(ToSchema))]
212#[derive(Debug, Clone, Serialize)]
213#[serde(untagged)]
214pub enum AmiEvent {
215    Newchannel(NewchannelEventData),
216    Hangup(HangupEventData),
217    PeerStatus(PeerStatusEventData),
218    UnknownEvent {
219        #[serde(rename = "Event")]
220        event_type: String,
221        #[serde(flatten)]
222        fields: HashMap<String, String>,
223    },
224    InternalConnectionLost {
225        error: String,
226    },
227}
228
229impl<'de> Deserialize<'de> for AmiEvent {
230    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
231    where
232        D: Deserializer<'de>,
233    {
234        let value = serde_json::Value::deserialize(deserializer)?;
235        let map_obj = value
236            .as_object()
237            .ok_or_else(|| serde::de::Error::custom("AmiEvent: Expected a JSON object/map"))?;
238
239        if let Some(event_type_val) = map_obj.get("Event") {
240            let event_type_str = event_type_val.as_str().ok_or_else(|| {
241                serde::de::Error::custom("AmiEvent: 'Event' field is not a string")
242            })?;
243
244            match event_type_str {
245                "Newchannel" => Ok(AmiEvent::Newchannel(
246                    NewchannelEventData::deserialize(value.clone())
247                        .map_err(serde::de::Error::custom)?,
248                )),
249                "Hangup" => Ok(AmiEvent::Hangup(
250                    HangupEventData::deserialize(value.clone())
251                        .map_err(serde::de::Error::custom)?,
252                )),
253                "PeerStatus" => Ok(AmiEvent::PeerStatus(
254                    PeerStatusEventData::deserialize(value.clone())
255                        .map_err(serde::de::Error::custom)?,
256                )),
257                _ => {
258                    let fields: HashMap<String, String> = map_obj
259                        .iter()
260                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
261                        .collect();
262                    Ok(AmiEvent::UnknownEvent {
263                        event_type: event_type_str.to_string(),
264                        fields,
265                    })
266                }
267            }
268        } else {
269            let fields: HashMap<String, String> = map_obj
270                .iter()
271                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
272                .collect();
273            Ok(AmiEvent::UnknownEvent {
274                event_type: "UnknownOrMalformed".to_string(),
275                fields,
276            })
277        }
278    }
279}
280
281#[derive(Debug, Error)]
282pub enum AmiError {
283    #[error("IO error: {0}")]
284    Io(#[from] std::io::Error),
285    #[error("Parse error: {0}")]
286    ParseError(String),
287    #[error("Serialize error: {0}")]
288    SerializeError(String),
289    #[error("JSON error: {0}")]
290    SerdeJson(#[from] serde_json::Error),
291    #[error("Authentication failed: {0}")]
292    AuthenticationFailed(String),
293    #[error("Action failed: {response:?}")]
294    ActionFailed { response: AmiResponse },
295    #[error("Connection closed")]
296    ConnectionClosed,
297    #[error("Operation timed out")]
298    Timeout,
299    #[error("Login required")]
300    LoginRequired,
301    #[error("Internal channel error: {0}")]
302    ChannelError(String),
303    #[error("Event stream lagged: {0}")]
304    EventStreamLagged(#[from] tokio::sync::broadcast::error::RecvError),
305    #[error("Not connected to AMI server")]
306    NotConnected,
307    #[error("Other error: {0}")]
308    Other(String),
309}
310
311#[derive(Serialize, Deserialize, Debug, Clone)]
312pub struct ManagerOptions {
313    pub port: u16,
314    pub host: String,
315    pub username: String,
316    pub password: String,
317    pub events: bool,
318}
319
320struct InnerManager {
321    authenticated: bool,
322    /// Channel for sending raw AMI messages
323    write_tx: Option<mpsc::Sender<String>>,
324    /// Channel for receiving raw AMI messages
325    event_broadcaster: broadcast::Sender<AmiEvent>,
326    /// Responders mapped for each action ID
327    pending_responses: HashMap<String, oneshot::Sender<Result<AmiResponse, AmiError>>>,
328    /// Heartbeat cancellation token
329    heartbeat_token: Option<CancellationToken>,
330    /// Watchdog cancellation token
331    watchdog_token: Option<CancellationToken>,
332    /// Unique identifier for this manager instance (for logging)
333    instance_id: String,
334}
335
336#[derive(Clone)]
337pub struct Manager {
338    pub(crate) inner: Arc<Mutex<InnerManager>>,
339}
340
341impl Default for Manager {
342    fn default() -> Self {
343        Self::new()
344    }
345}
346
347impl Manager {
348    pub fn new() -> Self {
349        Self::new_with_buffer(1024)
350    }
351
352    pub fn new_with_buffer(buffer_size: usize) -> Self {
353        let (event_tx, _) = broadcast::channel(buffer_size);
354        let instance_id = Uuid::new_v4().to_string()[..8].to_string();
355        log::debug!("Creating new Manager instance [{instance_id}]");
356        let inner = InnerManager {
357            authenticated: false,
358            write_tx: None,
359            event_broadcaster: event_tx,
360            pending_responses: HashMap::new(),
361            heartbeat_token: None,
362            watchdog_token: None,
363            instance_id,
364        };
365        Self {
366            inner: Arc::new(Mutex::new(inner)),
367        }
368    }
369
370    pub async fn connect_and_login(&mut self, options: ManagerOptions) -> Result<(), AmiError> {
371        let stream = timeout(
372            Duration::from_secs(10),
373            TcpStream::connect((options.host.as_str(), options.port)),
374        )
375        .await
376        .map_err(|_| AmiError::Timeout)?
377        .map_err(AmiError::Io)?;
378
379        let (reader, writer) = stream.into_split();
380
381        let (write_tx, write_rx) = mpsc::channel::<String>(100);
382        let (dispatch_tx, dispatch_rx) = mpsc::channel::<String>(1024);
383
384        let event_broadcaster = {
385            let inner = self.inner.lock().await;
386            inner.event_broadcaster.clone()
387        };
388
389        spawn_writer_task(writer, write_rx);
390        spawn_reader_task(reader, dispatch_tx, event_broadcaster);
391        spawn_dispatcher_task(self.inner.clone(), dispatch_rx);
392
393        self.inner.lock().await.write_tx = Some(write_tx);
394
395        let login_action = AmiAction::Login {
396            username: options.username.clone(),
397            secret: options.password.clone(),
398            events: Some("on".to_string()),
399            action_id: Some("rust-ami-login".to_string()),
400        };
401
402        match self.send_action(login_action).await {
403            Ok(resp) if resp.response.eq_ignore_ascii_case("Success") => {
404                self.inner.lock().await.authenticated = true;
405                Ok(())
406            }
407            Ok(resp) => Err(AmiError::AuthenticationFailed(
408                resp.message.unwrap_or_default(),
409            )),
410            Err(e) => Err(e),
411        }
412    }
413
414    pub async fn send_action(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
415        let action_id = get_or_set_action_id(&mut action);
416
417        let mut stream = self.all_events_stream().await;
418
419        let initial_response = self.send_initial_request(action.clone()).await?;
420
421        if initial_response
422            .fields
423            .get("EventList")
424            .and_then(|v| v.as_str())
425            == Some("start")
426        {
427            let mut collected_events = Vec::new();
428
429            let collection_result = tokio::time::timeout(Duration::from_secs(10), async {
430                use tokio_stream::StreamExt;
431                while let Some(Ok(event)) = stream.next().await {
432                    if let AmiEvent::UnknownEvent { event_type, fields } = &event {
433                        if fields.get("ActionID").map(|id| id.as_str()) == Some(&action_id) {
434                            if event_type.ends_with("Complete") {
435                                break;
436                            }
437                            collected_events.push(event.clone());
438                        }
439                    }
440                }
441            })
442            .await;
443
444            if collection_result.is_err() {
445                return Err(AmiError::Timeout);
446            }
447
448            let mut final_fields = initial_response.fields;
449            final_fields.insert(
450                "CollectedEvents".to_string(),
451                serde_json::to_value(&collected_events)?,
452            );
453
454            Ok(AmiResponse {
455                response: initial_response.response,
456                action_id: initial_response.action_id,
457                message: Some("Successfully collected events.".to_string()),
458                fields: final_fields,
459            })
460        } else {
461            Ok(initial_response)
462        }
463    }
464
465    async fn send_initial_request(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
466        let action_id = get_or_set_action_id(&mut action);
467        let (tx, rx) = oneshot::channel();
468        let action_str = serialize_ami_action(&action)?;
469
470        {
471            let mut inner = self.inner.lock().await;
472            if inner.write_tx.is_none() {
473                return Err(AmiError::NotConnected);
474            }
475            inner.pending_responses.insert(action_id.clone(), tx);
476            let writer = inner.write_tx.as_ref().unwrap();
477            if writer.send(action_str).await.is_err() {
478                inner.pending_responses.remove(&action_id);
479                return Err(AmiError::ConnectionClosed);
480            }
481        }
482
483        match timeout(Duration::from_secs(10), rx).await {
484            Ok(Ok(Ok(resp))) => Ok(resp),
485            Ok(Ok(Err(e))) => Err(e),
486            Ok(Err(_)) => Err(AmiError::ChannelError("Responder dropped".to_string())),
487            Err(_) => Err(AmiError::Timeout),
488        }
489    }
490
491    pub async fn disconnect(&self) -> Result<(), AmiError> {
492        let mut inner = self.inner.lock().await;
493        inner.write_tx = None;
494        inner.authenticated = false;
495
496        // Cancel heartbeat and watchdog
497        if let Some(token) = &inner.heartbeat_token {
498            token.cancel();
499            inner.heartbeat_token = None;
500        }
501        if let Some(token) = &inner.watchdog_token {
502            token.cancel();
503            inner.watchdog_token = None;
504        }
505
506        Ok(())
507    }
508
509    pub async fn is_authenticated(&self) -> bool {
510        self.inner.lock().await.authenticated
511    }
512
513    pub async fn all_events_stream(
514        &self,
515    ) -> impl Stream<Item = Result<AmiEvent, BroadcastStreamRecvError>> + Send + Unpin {
516        let inner = self.inner.lock().await;
517        BroadcastStream::new(inner.event_broadcaster.subscribe())
518    }
519
520    /// Start heartbeat with default interval (30 seconds). Kept for backwards compatibility.
521    pub async fn start_heartbeat(&self) -> Result<(), AmiError> {
522        self.start_heartbeat_with_interval(30).await
523    }
524
525    /// Start the heartbeat task with a configurable interval (in seconds).
526    pub async fn start_heartbeat_with_interval(&self, interval_secs: u64) -> Result<(), AmiError> {
527        let mut inner = self.inner.lock().await;
528        let instance_id = inner.instance_id.clone();
529
530        // Cancel existing heartbeat if any
531        if let Some(token) = &inner.heartbeat_token {
532            log::debug!("[{instance_id}] Cancelling existing heartbeat task");
533            token.cancel();
534        }
535
536        let token = CancellationToken::new();
537        inner.heartbeat_token = Some(token.clone());
538
539        log::debug!("[{instance_id}] Starting heartbeat task (interval={interval_secs}s)");
540
541        let manager = self.clone();
542        tokio::spawn(async move {
543            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
544            log::debug!("[{instance_id}] Heartbeat task started");
545            loop {
546                tokio::select! {
547                    _ = token.cancelled() => {
548                        log::debug!("[{instance_id}] Heartbeat task cancelled");
549                        break;
550                    }
551                    _ = interval.tick() => {
552                        if manager.is_authenticated().await {
553                            match manager.send_action(AmiAction::Ping { action_id: None }).await {
554                                Ok(_) => {
555                                    log::debug!("[{instance_id}] Heartbeat ping successful");
556                                }
557                                Err(e) => {
558                                    log::warn!("[{instance_id}] Heartbeat ping failed: {e}");
559                                    // Emit connection lost event
560                                    if let Ok(inner) = manager.inner.try_lock() {
561                                        let _ = inner.event_broadcaster.send(AmiEvent::InternalConnectionLost {
562                                            error: format!("Heartbeat failed: {e}"),
563                                        });
564                                    }
565                                    // Disconnect on heartbeat failure
566                                    let _ = manager.disconnect().await;
567                                    break;
568                                }
569                            }
570                        } else {
571                            log::trace!("[{instance_id}] Heartbeat tick: not authenticated, skipping ping");
572                        }
573                    }
574                }
575            }
576        });
577
578        Ok(())
579    }
580
581    pub async fn start_watchdog(&self, options: ManagerOptions) -> Result<(), AmiError> {
582        let instance_id = self.inner.lock().await.instance_id.clone();
583        log::debug!(
584            "[{}] Starting watchdog (default interval=1s) for user '{}' at {}:{}",
585            instance_id,
586            options.username,
587            options.host,
588            options.port
589        );
590        self.start_watchdog_with_interval(options, 1).await
591    }
592
593    pub async fn start_watchdog_with_interval(
594        &self,
595        options: ManagerOptions,
596        interval_secs: u64,
597    ) -> Result<(), AmiError> {
598        let mut inner = self.inner.lock().await;
599        let instance_id = inner.instance_id.clone();
600
601        // Cancel existing watchdog if any
602        if let Some(token) = &inner.watchdog_token {
603            log::debug!(
604                "[{instance_id}] Cancelling existing watchdog task before starting a new one"
605            );
606            token.cancel();
607        }
608
609        let token = CancellationToken::new();
610        inner.watchdog_token = Some(token.clone());
611
612        log::debug!(
613            "[{}] Spawning watchdog task (interval={}s) for user '{}' at {}:{}",
614            instance_id,
615            interval_secs,
616            options.username,
617            options.host,
618            options.port
619        );
620
621        let manager = self.clone();
622        tokio::spawn(async move {
623            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
624            log::debug!(
625                "[{}] Watchdog task started (interval={}s) for '{}'@{}:{}",
626                instance_id,
627                interval_secs,
628                options.username,
629                options.host,
630                options.port
631            );
632            loop {
633                tokio::select! {
634                    _ = token.cancelled() => {
635                        log::debug!("[{instance_id}] Watchdog task cancelled by token");
636                        break;
637                    }
638                    _ = interval.tick() => {
639                        if !manager.is_authenticated().await {
640                            log::debug!(
641                                "[{}] Watchdog attempting reconnection to '{}'@{}:{}...",
642                                instance_id,
643                                options.username, options.host, options.port
644                            );
645                            let mut mgr = manager.clone();
646                            match mgr.connect_and_login(options.clone()).await {
647                                Ok(_) => {
648                                    log::info!(
649                                        "[{}] Watchdog reconnection successful to '{}'@{}:{}",
650                                        instance_id,
651                                        options.username, options.host, options.port
652                                    );
653                                }
654                                Err(e) => {
655                                    log::debug!(
656                                        "[{}] Watchdog reconnection to '{}'@{}:{} failed: {}",
657                                        instance_id,
658                                        options.username, options.host, options.port, e
659                                    );
660                                }
661                            }
662                        } else {
663                            log::trace!("[{instance_id}] Watchdog tick: already authenticated; no action taken");
664                        }
665                    }
666                }
667            }
668        });
669
670        Ok(())
671    }
672}
673
674fn spawn_writer_task(mut writer: OwnedWriteHalf, mut write_rx: mpsc::Receiver<String>) {
675    tokio::spawn(async move {
676        while let Some(action_str) = write_rx.recv().await {
677            if writer.write_all(action_str.as_bytes()).await.is_err() {
678                break;
679            }
680        }
681    });
682}
683
684fn spawn_reader_task(
685    reader: OwnedReadHalf,
686    dispatch_tx: mpsc::Sender<String>,
687    event_broadcaster: broadcast::Sender<AmiEvent>,
688) {
689    tokio::spawn(async move {
690        let mut buf_reader = BufReader::new(reader);
691        loop {
692            let mut message_block = String::new();
693            loop {
694                let mut line = String::new();
695                match buf_reader.read_line(&mut line).await {
696                    Ok(0) | Err(_) => {
697                        // Connection lost - emit synthetic event
698                        let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
699                            error: "Connection lost during read".to_string(),
700                        });
701                        return;
702                    }
703                    Ok(_) => {
704                        let is_end = line == "\r\n";
705                        message_block.push_str(&line);
706                        if is_end {
707                            break;
708                        }
709                    }
710                }
711            }
712
713            if !message_block.trim().is_empty() && dispatch_tx.send(message_block).await.is_err() {
714                let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
715                    error: "Dispatcher channel closed".to_string(),
716                });
717                break;
718            }
719        }
720    });
721}
722
723fn spawn_dispatcher_task(
724    inner_arc: Arc<Mutex<InnerManager>>,
725    mut dispatch_rx: mpsc::Receiver<String>,
726) {
727    tokio::spawn(async move {
728        while let Some(raw_message) = dispatch_rx.recv().await {
729            if let Ok(parsed_messages) = parse_ami_protocol_message(&raw_message) {
730                for value_msg in parsed_messages {
731                    let mut inner = inner_arc.lock().await;
732                    if value_msg.get("Response").is_some() {
733                        if let Ok(resp) = serde_json::from_value::<AmiResponse>(value_msg) {
734                            if let Some(action_id) = &resp.action_id {
735                                if let Some(responder) = inner.pending_responses.remove(action_id) {
736                                    let _ = responder.send(Ok(resp));
737                                }
738                            }
739                        }
740                    } else if value_msg.get("Event").is_some() {
741                        if let Ok(event) = serde_json::from_value::<AmiEvent>(value_msg.clone()) {
742                            let _ = inner.event_broadcaster.send(event);
743                        }
744                    }
745                }
746            }
747        }
748    });
749}
750
751fn parse_ami_protocol_message(raw_data: &str) -> Result<Vec<serde_json::Value>, AmiError> {
752    let mut messages = Vec::new();
753    for block in raw_data.trim().split("\r\n\r\n") {
754        if block.is_empty() {
755            continue;
756        }
757        let mut map = serde_json::Map::new();
758        for line in block.lines() {
759            if let Some((key, value)) = line.split_once(": ") {
760                map.insert(
761                    key.trim().to_string(),
762                    serde_json::Value::String(value.trim().to_string()),
763                );
764            }
765        }
766        if !map.is_empty() {
767            messages.push(serde_json::Value::Object(map));
768        }
769    }
770    Ok(messages)
771}
772
773fn serialize_ami_action(action: &AmiAction) -> Result<String, AmiError> {
774    let mut s = String::new();
775    match action {
776        AmiAction::Login {
777            username,
778            secret,
779            events,
780            action_id,
781        } => {
782            s.push_str("Action: Login\r\n");
783            s.push_str(&format!("Username: {username}\r\n"));
784            s.push_str(&format!("Secret: {secret}\r\n"));
785            if let Some(ev) = events {
786                s.push_str(&format!("Events: {ev}\r\n"));
787            }
788            if let Some(id) = action_id {
789                s.push_str(&format!("ActionID: {id}\r\n"));
790            }
791        }
792        AmiAction::Logoff { action_id } => {
793            s.push_str("Action: Logoff\r\n");
794            if let Some(id) = action_id {
795                s.push_str(&format!("ActionID: {id}\r\n"));
796            }
797        }
798        AmiAction::Ping { action_id } => {
799            s.push_str("Action: Ping\r\n");
800            if let Some(id) = action_id {
801                s.push_str(&format!("ActionID: {id}\r\n"));
802            }
803        }
804        AmiAction::Command { command, action_id } => {
805            s.push_str("Action: Command\r\n");
806            s.push_str(&format!("Command: {command}\r\n"));
807            if let Some(id) = action_id {
808                s.push_str(&format!("ActionID: {id}\r\n"));
809            }
810        }
811        AmiAction::Custom {
812            action: action_name,
813            params,
814            action_id,
815        } => {
816            s.push_str(&format!("Action: {action_name}\r\n"));
817            for (k, v) in params {
818                s.push_str(&format!("{k}: {v}\r\n"));
819            }
820            if let Some(id) = action_id {
821                s.push_str(&format!("ActionID: {id}\r\n"));
822            }
823        }
824    }
825    s.push_str("\r\n");
826    Ok(s)
827}
828
829fn get_or_set_action_id(action: &mut AmiAction) -> String {
830    match action {
831        AmiAction::Login { action_id, .. }
832        | AmiAction::Logoff { action_id }
833        | AmiAction::Ping { action_id }
834        | AmiAction::Command { action_id, .. }
835        | AmiAction::Custom { action_id, .. } => {
836            if let Some(id) = action_id {
837                id.clone()
838            } else {
839                let new_id = Uuid::new_v4().to_string();
840                *action_id = Some(new_id.clone());
841                new_id
842            }
843        }
844    }
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850    use tokio_stream::StreamExt;
851
852    #[test]
853    fn test_serialize_login_action() {
854        let action = AmiAction::Login {
855            username: "user".to_string(),
856            secret: "pass".to_string(),
857            events: Some("on".to_string()),
858            action_id: Some("abc123".to_string()),
859        };
860        let s = serialize_ami_action(&action).unwrap();
861        assert!(s.contains("Action: Login"));
862        assert!(s.contains("Username: user"));
863        assert!(s.contains("Secret: pass"));
864        assert!(s.contains("Events: on"));
865        assert!(s.contains("ActionID: abc123"));
866        assert!(s.ends_with("\r\n\r\n"));
867    }
868
869    #[test]
870    fn test_serialize_command_action() {
871        let action = AmiAction::Command {
872            command: "sip show peers".to_string(),
873            action_id: None,
874        };
875        let s = serialize_ami_action(&action).unwrap();
876        assert!(s.contains("Action: Command"));
877        assert!(s.contains("Command: sip show peers"));
878    }
879
880    #[test]
881    fn test_parse_ami_protocol_message() {
882        let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
883        let parsed = parse_ami_protocol_message(raw).unwrap();
884        assert_eq!(parsed.len(), 1);
885        let obj = &parsed[0];
886        assert_eq!(obj["Response"], "Success");
887        assert_eq!(obj["ActionID"], "123");
888        assert_eq!(obj["Message"], "Authentication accepted");
889    }
890
891    #[test]
892    fn test_deserialize_ami_response() {
893        let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
894        let parsed = parse_ami_protocol_message(raw).unwrap();
895        let resp: AmiResponse = serde_json::from_value(parsed[0].clone()).unwrap();
896        assert_eq!(resp.response, "Success");
897        assert_eq!(resp.action_id.as_deref(), Some("123"));
898        assert_eq!(resp.message.as_deref(), Some("Authentication accepted"));
899    }
900
901    #[test]
902    fn test_deserialize_newchannel_event() {
903        let raw = "Event: Newchannel\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nChannelState: 4\r\nChannelStateDesc: Ring\r\nCallerIDNum: 100\r\nCallerIDName: Alice\r\n\r\n";
904        let parsed = parse_ami_protocol_message(raw).unwrap();
905        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
906        match event {
907            AmiEvent::Newchannel(data) => {
908                assert_eq!(data.channel, "SIP/100-00000001");
909                assert_eq!(data.uniqueid, "1234");
910                assert_eq!(data.channel_state.as_deref(), Some("4"));
911                assert_eq!(data.channel_state_desc.as_deref(), Some("Ring"));
912                assert_eq!(data.caller_id_num.as_deref(), Some("100"));
913                assert_eq!(data.caller_id_name.as_deref(), Some("Alice"));
914            }
915            _ => panic!("Expected AmiEvent::Newchannel"),
916        }
917    }
918
919    #[test]
920    fn test_deserialize_hangup_event() {
921        let raw = "Event: Hangup\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nCause: 16\r\nCause-txt: Normal Clearing\r\n\r\n";
922        let parsed = parse_ami_protocol_message(raw).unwrap();
923        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
924        match event {
925            AmiEvent::Hangup(data) => {
926                assert_eq!(data.channel, "SIP/100-00000001");
927                assert_eq!(data.uniqueid, "1234");
928                assert_eq!(data.cause.as_deref(), Some("16"));
929                assert_eq!(data.cause_txt.as_deref(), Some("Normal Clearing"));
930            }
931            _ => panic!("Expected AmiEvent::Hangup"),
932        }
933    }
934
935    #[test]
936    fn test_deserialize_peerstatus_event() {
937        let raw = "Event: PeerStatus\r\nPeer: SIP/100\r\nPeerStatus: Registered\r\n\r\n";
938        let parsed = parse_ami_protocol_message(raw).unwrap();
939        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
940        match event {
941            AmiEvent::PeerStatus(data) => {
942                assert_eq!(data.peer, "SIP/100");
943                assert_eq!(data.peer_status, "Registered");
944            }
945            _ => panic!("Expected AmiEvent::PeerStatus"),
946        }
947    }
948
949    #[test]
950    fn test_deserialize_unknown_event() {
951        let raw = "Event: FooBar\r\nSomeField: Value\r\n\r\n";
952        let parsed = parse_ami_protocol_message(raw).unwrap();
953        let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
954        match event {
955            AmiEvent::UnknownEvent { event_type, fields } => {
956                assert_eq!(event_type, "FooBar");
957                assert_eq!(fields.get("SomeField").map(|s| s.as_str()), Some("Value"));
958            }
959            _ => panic!("Expected AmiEvent::UnknownEvent"),
960        }
961    }
962
963    #[tokio::test]
964    async fn test_manager_options_clone() {
965        let opts = ManagerOptions {
966            port: 5038,
967            host: "localhost".to_string(),
968            username: "admin".to_string(),
969            password: "pwd".to_string(),
970            events: true,
971        };
972        let opts2 = opts.clone();
973        assert_eq!(opts.port, opts2.port);
974        assert_eq!(opts.host, opts2.host);
975        assert_eq!(opts.username, opts2.username);
976        assert_eq!(opts.password, opts2.password);
977        assert_eq!(opts.events, opts2.events);
978    }
979
980    #[tokio::test]
981    async fn test_manager_new_and_auth_flag() {
982        // A criação de `opts` não é mais necessária para este teste.
983        let manager = Manager::new(); // Manager::new() agora não tem argumentos.
984        assert!(!manager.is_authenticated().await);
985    }
986
987    #[tokio::test]
988    async fn test_event_internal_connection_lost() {
989        // 1. Cria um manager vazio, como no teste anterior.
990        let manager = Manager::new();
991
992        // 2. Get the event stream BEFORE sending the event.
993        let mut stream = manager.all_events_stream().await;
994
995        // 3. Send the event internally to simulate a disconnection.
996        //    This part works again because of `pub(crate)`.
997        {
998            let inner = manager.inner.lock().await;
999            let _ = inner
1000                .event_broadcaster
1001                .send(AmiEvent::InternalConnectionLost {
1002                    error: "simulated".to_string(),
1003                });
1004        }
1005
1006        // 4. Verifica se o evento foi recebido corretamente pelo stream.
1007        let ev = stream.next().await.unwrap().unwrap();
1008        match ev {
1009            AmiEvent::InternalConnectionLost { error } => {
1010                assert_eq!(error, "simulated");
1011            }
1012            _ => panic!("Expected InternalConnectionLost"),
1013        }
1014    }
1015
1016    #[tokio::test]
1017    async fn test_manager_options_default() {
1018        let opts = ManagerOptions {
1019            port: 5038,
1020            host: "localhost".to_string(),
1021            username: "admin".to_string(),
1022            password: "pwd".to_string(),
1023            events: true,
1024        };
1025        assert!(opts.events);
1026    }
1027
1028    #[tokio::test]
1029    async fn test_manager_new_with_buffer() {
1030        let manager = Manager::new_with_buffer(512);
1031        assert!(!manager.is_authenticated().await);
1032
1033        // Test that the buffer size is respected by checking we can create the stream
1034        let _stream = manager.all_events_stream().await;
1035    }
1036
1037    #[tokio::test]
1038    async fn test_heartbeat_and_watchdog_tokens() {
1039        let manager = Manager::new();
1040
1041        // Initially no tokens should be set
1042        {
1043            let inner = manager.inner.lock().await;
1044            assert!(inner.heartbeat_token.is_none());
1045            assert!(inner.watchdog_token.is_none());
1046        }
1047
1048        // Create dummy options for testing
1049        let opts = ManagerOptions {
1050            port: 5038,
1051            host: "127.0.0.1".to_string(),
1052            username: "test".to_string(),
1053            password: "test".to_string(),
1054            events: true,
1055        };
1056
1057        // Start heartbeat should set token (even though connection will fail)
1058        let _ = manager.start_heartbeat().await;
1059        {
1060            let inner = manager.inner.lock().await;
1061            assert!(inner.heartbeat_token.is_some());
1062        }
1063
1064        // Start watchdog should set token
1065        let _ = manager.start_watchdog(opts).await;
1066        {
1067            let inner = manager.inner.lock().await;
1068            assert!(inner.watchdog_token.is_some());
1069        }
1070
1071        // Disconnect should clear both tokens
1072        let _ = manager.disconnect().await;
1073        {
1074            let inner = manager.inner.lock().await;
1075            assert!(inner.heartbeat_token.is_none());
1076            assert!(inner.watchdog_token.is_none());
1077        }
1078    }
1079
1080    #[tokio::test]
1081    async fn test_connection_lost_event_emission() {
1082        // Test that synthetic connection lost events are properly emitted
1083        let manager = Manager::new();
1084        let mut stream = manager.all_events_stream().await;
1085
1086        // Manually emit a connection lost event
1087        {
1088            let inner = manager.inner.lock().await;
1089            let _ = inner
1090                .event_broadcaster
1091                .send(AmiEvent::InternalConnectionLost {
1092                    error: "test connection lost".to_string(),
1093                });
1094        }
1095
1096        // Verify the event is received
1097        let event = stream.next().await.unwrap().unwrap();
1098        match event {
1099            AmiEvent::InternalConnectionLost { error } => {
1100                assert_eq!(error, "test connection lost");
1101            }
1102            _ => panic!("Expected InternalConnectionLost event"),
1103        }
1104    }
1105
1106    #[tokio::test]
1107    async fn test_heartbeat_interval_respected() {
1108        // Use tokio time control to test heartbeat scheduling
1109        tokio::time::pause();
1110
1111        let manager = Manager::new();
1112
1113        // Start heartbeat with a short interval
1114        let _ = manager.start_heartbeat_with_interval(2).await;
1115
1116        // Advance time less than interval: no ticks yet
1117        tokio::time::advance(Duration::from_secs(1)).await;
1118        {
1119            let inner = manager.inner.lock().await;
1120            // Token should be set
1121            assert!(inner.heartbeat_token.is_some());
1122        }
1123
1124        // Advance time to trigger at least one tick
1125        tokio::time::advance(Duration::from_secs(2)).await;
1126
1127        // There's no direct public hook for verifying pings were sent without mocking
1128        // but we can assert that the heartbeat task remains active and didn't panic.
1129        // Ensure token still exists
1130        {
1131            let inner = manager.inner.lock().await;
1132            assert!(inner.heartbeat_token.is_some());
1133        }
1134
1135        // Clean up
1136        let _ = manager.disconnect().await;
1137    }
1138
1139    #[tokio::test]
1140    async fn test_watchdog_interval_configuration() {
1141        // Test that watchdog can be started with different intervals
1142        let manager = Manager::new();
1143
1144        let opts = ManagerOptions {
1145            port: 5038,
1146            host: "127.0.0.1".to_string(),
1147            username: "test".to_string(),
1148            password: "test".to_string(),
1149            events: true,
1150        };
1151
1152        // Test default interval (backward compatibility)
1153        let _ = manager.start_watchdog(opts.clone()).await;
1154        {
1155            let inner = manager.inner.lock().await;
1156            assert!(inner.watchdog_token.is_some());
1157        }
1158
1159        // Test custom interval
1160        let _ = manager.start_watchdog_with_interval(opts.clone(), 5).await;
1161        {
1162            let inner = manager.inner.lock().await;
1163            assert!(inner.watchdog_token.is_some());
1164        }
1165
1166        // Clean up
1167        let _ = manager.disconnect().await;
1168    }
1169
1170    #[test]
1171    fn test_unknown_event_serialization_roundtrip() {
1172        // Test that UnknownEvent can be serialized and deserialized without data loss
1173        let mut fields = HashMap::new();
1174        fields.insert("Event".to_string(), "ContactStatus".to_string());
1175        fields.insert("AOR".to_string(), "1000021005".to_string());
1176        fields.insert("ContactStatus".to_string(), "Removed".to_string());
1177
1178        let original = AmiEvent::UnknownEvent {
1179            event_type: "ContactStatus".to_string(),
1180            fields: fields.clone(),
1181        };
1182
1183        // Serialize to JSON
1184        let json = serde_json::to_string(&original).unwrap();
1185        
1186        // Deserialize back
1187        let deserialized: AmiEvent = serde_json::from_str(&json).unwrap();
1188        
1189        // Verify it's still UnknownEvent with correct event_type
1190        match deserialized {
1191            AmiEvent::UnknownEvent { event_type, fields: deserialized_fields } => {
1192                assert_eq!(event_type, "ContactStatus", "Event type should be preserved");
1193                assert_eq!(
1194                    deserialized_fields.get("AOR").map(|s| s.as_str()),
1195                    Some("1000021005"),
1196                    "Fields should be preserved"
1197                );
1198                assert_eq!(
1199                    deserialized_fields.get("ContactStatus").map(|s| s.as_str()),
1200                    Some("Removed"),
1201                    "Fields should be preserved"
1202                );
1203            }
1204            _ => panic!("Expected AmiEvent::UnknownEvent after deserialization, got {:?}", deserialized),
1205        }
1206    }
1207
1208    #[test]
1209    fn test_unknown_event_kafka_scenario() {
1210        // Simulate the exact scenario from the bug report:
1211        // Library creates UnknownEvent -> Serialize to Kafka -> Deserialize from Kafka
1212        
1213        // 1. Library creates UnknownEvent when receiving an event from Asterisk
1214        let mut fields = HashMap::new();
1215        fields.insert("Event".to_string(), "ContactStatus".to_string());
1216        fields.insert("AOR".to_string(), "1000021005".to_string());
1217        fields.insert("ContactStatus".to_string(), "Removed".to_string());
1218        fields.insert("URI".to_string(), "sip:1000021005@10.0.0.1:5060".to_string());
1219        
1220        let original = AmiEvent::UnknownEvent {
1221            event_type: "ContactStatus".to_string(),
1222            fields: fields.clone(),
1223        };
1224
1225        // 2. Serialize (e.g., to send via Kafka)
1226        let json = serde_json::to_string(&original).unwrap();
1227
1228        // 3. Deserialize (e.g., consumer receives from Kafka)
1229        let deserialized: AmiEvent = serde_json::from_str(&json).unwrap();
1230
1231        // 4. Verify all data is preserved
1232        match deserialized {
1233            AmiEvent::UnknownEvent { event_type, fields: deserialized_fields } => {
1234                assert_eq!(event_type, "ContactStatus");
1235                assert_eq!(deserialized_fields.get("AOR"), Some(&"1000021005".to_string()));
1236                assert_eq!(deserialized_fields.get("ContactStatus"), Some(&"Removed".to_string()));
1237                assert_eq!(deserialized_fields.get("URI"), Some(&"sip:1000021005@10.0.0.1:5060".to_string()));
1238                // The Event field should also be preserved in fields
1239                assert_eq!(deserialized_fields.get("Event"), Some(&"ContactStatus".to_string()));
1240            }
1241            _ => panic!("Expected UnknownEvent with ContactStatus, got {:?}", deserialized),
1242        }
1243    }
1244}