1use serde::de::Deserializer;
96use serde::{Deserialize, Serialize};
97use serde_json::Value;
98use std::collections::HashMap;
99use std::sync::Arc;
100use thiserror::Error;
101use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
102use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
103use tokio::net::TcpStream;
104use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
105use tokio::time::{timeout, Duration};
106use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
107use tokio_stream::wrappers::BroadcastStream;
108use tokio_stream::Stream;
109use tokio_util::sync::CancellationToken;
110#[cfg(feature = "docs")]
111use utoipa::ToSchema;
112use uuid::Uuid;
113
114pub mod resilient;
115
116#[cfg_attr(feature = "docs", derive(ToSchema))]
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct AmiResponse {
119 #[serde(rename = "Response")]
120 pub response: String,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 #[serde(rename = "ActionID")]
123 pub action_id: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 #[serde(rename = "Message")]
126 pub message: Option<String>,
127 #[serde(flatten)]
128 #[cfg_attr(feature = "docs", schema(additional_properties = true))]
129 pub fields: HashMap<String, Value>,
130}
131
132#[cfg_attr(feature = "docs", derive(ToSchema))]
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(tag = "Action", rename_all = "PascalCase")]
135pub enum AmiAction {
136 Login {
137 username: String,
138 secret: String,
139 #[serde(rename = "Events")]
140 events: Option<String>,
141 #[serde(rename = "ActionID")]
142 action_id: Option<String>,
143 },
144 Logoff {
145 #[serde(rename = "ActionID")]
146 action_id: Option<String>,
147 },
148 Ping {
149 #[serde(rename = "ActionID")]
150 action_id: Option<String>,
151 },
152 Command {
153 command: String,
154 #[serde(rename = "ActionID")]
155 action_id: Option<String>,
156 },
157 Custom {
158 action: String,
159 #[serde(flatten)]
160 params: HashMap<String, String>,
161 #[serde(rename = "ActionID")]
162 action_id: Option<String>,
163 },
164}
165
166#[cfg_attr(feature = "docs", derive(ToSchema))]
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct NewchannelEventData {
169 #[serde(rename = "Channel")]
170 pub channel: String,
171 #[serde(rename = "Uniqueid")]
172 pub uniqueid: String,
173 #[serde(rename = "ChannelState")]
174 pub channel_state: Option<String>,
175 #[serde(rename = "ChannelStateDesc")]
176 pub channel_state_desc: Option<String>,
177 #[serde(rename = "CallerIDNum")]
178 pub caller_id_num: Option<String>,
179 #[serde(rename = "CallerIDName")]
180 pub caller_id_name: Option<String>,
181 #[serde(flatten)]
182 pub other: HashMap<String, String>,
183}
184
185#[cfg_attr(feature = "docs", derive(ToSchema))]
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct HangupEventData {
188 #[serde(rename = "Channel")]
189 pub channel: String,
190 #[serde(rename = "Uniqueid")]
191 pub uniqueid: String,
192 #[serde(rename = "Cause")]
193 pub cause: Option<String>,
194 #[serde(rename = "Cause-txt")]
195 pub cause_txt: Option<String>,
196 #[serde(flatten)]
197 pub other: HashMap<String, String>,
198}
199
200#[cfg_attr(feature = "docs", derive(ToSchema))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PeerStatusEventData {
203 #[serde(rename = "Peer")]
204 pub peer: String,
205 #[serde(rename = "PeerStatus")]
206 pub peer_status: String,
207 #[serde(flatten)]
208 pub other: HashMap<String, String>,
209}
210
211#[cfg_attr(feature = "docs", derive(ToSchema))]
212#[derive(Debug, Clone, Serialize)]
213#[serde(untagged)]
214pub enum AmiEvent {
215 Newchannel(NewchannelEventData),
216 Hangup(HangupEventData),
217 PeerStatus(PeerStatusEventData),
218 UnknownEvent {
219 #[serde(rename = "Event")]
220 event_type: String,
221 #[serde(flatten)]
222 fields: HashMap<String, String>,
223 },
224 InternalConnectionLost {
225 error: String,
226 },
227}
228
229impl<'de> Deserialize<'de> for AmiEvent {
230 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
231 where
232 D: Deserializer<'de>,
233 {
234 let value = serde_json::Value::deserialize(deserializer)?;
235 let map_obj = value
236 .as_object()
237 .ok_or_else(|| serde::de::Error::custom("AmiEvent: Expected a JSON object/map"))?;
238
239 if let Some(event_type_val) = map_obj.get("Event") {
240 let event_type_str = event_type_val.as_str().ok_or_else(|| {
241 serde::de::Error::custom("AmiEvent: 'Event' field is not a string")
242 })?;
243
244 match event_type_str {
245 "Newchannel" => Ok(AmiEvent::Newchannel(
246 NewchannelEventData::deserialize(value.clone())
247 .map_err(serde::de::Error::custom)?,
248 )),
249 "Hangup" => Ok(AmiEvent::Hangup(
250 HangupEventData::deserialize(value.clone())
251 .map_err(serde::de::Error::custom)?,
252 )),
253 "PeerStatus" => Ok(AmiEvent::PeerStatus(
254 PeerStatusEventData::deserialize(value.clone())
255 .map_err(serde::de::Error::custom)?,
256 )),
257 _ => {
258 let fields: HashMap<String, String> = map_obj
259 .iter()
260 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
261 .collect();
262 Ok(AmiEvent::UnknownEvent {
263 event_type: event_type_str.to_string(),
264 fields,
265 })
266 }
267 }
268 } else {
269 let fields: HashMap<String, String> = map_obj
270 .iter()
271 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
272 .collect();
273 Ok(AmiEvent::UnknownEvent {
274 event_type: "UnknownOrMalformed".to_string(),
275 fields,
276 })
277 }
278 }
279}
280
281#[derive(Debug, Error)]
282pub enum AmiError {
283 #[error("IO error: {0}")]
284 Io(#[from] std::io::Error),
285 #[error("Parse error: {0}")]
286 ParseError(String),
287 #[error("Serialize error: {0}")]
288 SerializeError(String),
289 #[error("JSON error: {0}")]
290 SerdeJson(#[from] serde_json::Error),
291 #[error("Authentication failed: {0}")]
292 AuthenticationFailed(String),
293 #[error("Action failed: {response:?}")]
294 ActionFailed { response: AmiResponse },
295 #[error("Connection closed")]
296 ConnectionClosed,
297 #[error("Operation timed out")]
298 Timeout,
299 #[error("Login required")]
300 LoginRequired,
301 #[error("Internal channel error: {0}")]
302 ChannelError(String),
303 #[error("Event stream lagged: {0}")]
304 EventStreamLagged(#[from] tokio::sync::broadcast::error::RecvError),
305 #[error("Not connected to AMI server")]
306 NotConnected,
307 #[error("Other error: {0}")]
308 Other(String),
309}
310
311#[derive(Serialize, Deserialize, Debug, Clone)]
312pub struct ManagerOptions {
313 pub port: u16,
314 pub host: String,
315 pub username: String,
316 pub password: String,
317 pub events: bool,
318}
319
320struct InnerManager {
321 authenticated: bool,
322 write_tx: Option<mpsc::Sender<String>>,
324 event_broadcaster: broadcast::Sender<AmiEvent>,
326 pending_responses: HashMap<String, oneshot::Sender<Result<AmiResponse, AmiError>>>,
328 heartbeat_token: Option<CancellationToken>,
330 watchdog_token: Option<CancellationToken>,
332 instance_id: String,
334}
335
336#[derive(Clone)]
337pub struct Manager {
338 pub(crate) inner: Arc<Mutex<InnerManager>>,
339}
340
341impl Default for Manager {
342 fn default() -> Self {
343 Self::new()
344 }
345}
346
347impl Manager {
348 pub fn new() -> Self {
349 Self::new_with_buffer(1024)
350 }
351
352 pub fn new_with_buffer(buffer_size: usize) -> Self {
353 let (event_tx, _) = broadcast::channel(buffer_size);
354 let instance_id = Uuid::new_v4().to_string()[..8].to_string();
355 log::debug!("Creating new Manager instance [{instance_id}]");
356 let inner = InnerManager {
357 authenticated: false,
358 write_tx: None,
359 event_broadcaster: event_tx,
360 pending_responses: HashMap::new(),
361 heartbeat_token: None,
362 watchdog_token: None,
363 instance_id,
364 };
365 Self {
366 inner: Arc::new(Mutex::new(inner)),
367 }
368 }
369
370 pub async fn connect_and_login(&mut self, options: ManagerOptions) -> Result<(), AmiError> {
371 let stream = timeout(
372 Duration::from_secs(10),
373 TcpStream::connect((options.host.as_str(), options.port)),
374 )
375 .await
376 .map_err(|_| AmiError::Timeout)?
377 .map_err(AmiError::Io)?;
378
379 let (reader, writer) = stream.into_split();
380
381 let (write_tx, write_rx) = mpsc::channel::<String>(100);
382 let (dispatch_tx, dispatch_rx) = mpsc::channel::<String>(1024);
383
384 let event_broadcaster = {
385 let inner = self.inner.lock().await;
386 inner.event_broadcaster.clone()
387 };
388
389 spawn_writer_task(writer, write_rx);
390 spawn_reader_task(reader, dispatch_tx, event_broadcaster);
391 spawn_dispatcher_task(self.inner.clone(), dispatch_rx);
392
393 self.inner.lock().await.write_tx = Some(write_tx);
394
395 let login_action = AmiAction::Login {
396 username: options.username.clone(),
397 secret: options.password.clone(),
398 events: Some("on".to_string()),
399 action_id: Some("rust-ami-login".to_string()),
400 };
401
402 match self.send_action(login_action).await {
403 Ok(resp) if resp.response.eq_ignore_ascii_case("Success") => {
404 self.inner.lock().await.authenticated = true;
405 Ok(())
406 }
407 Ok(resp) => Err(AmiError::AuthenticationFailed(
408 resp.message.unwrap_or_default(),
409 )),
410 Err(e) => Err(e),
411 }
412 }
413
414 pub async fn send_action(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
415 let action_id = get_or_set_action_id(&mut action);
416
417 let mut stream = self.all_events_stream().await;
418
419 let initial_response = self.send_initial_request(action.clone()).await?;
420
421 if initial_response
422 .fields
423 .get("EventList")
424 .and_then(|v| v.as_str())
425 == Some("start")
426 {
427 let mut collected_events = Vec::new();
428
429 let collection_result = tokio::time::timeout(Duration::from_secs(10), async {
430 use tokio_stream::StreamExt;
431 while let Some(Ok(event)) = stream.next().await {
432 if let AmiEvent::UnknownEvent { event_type, fields } = &event {
433 if fields.get("ActionID").map(|id| id.as_str()) == Some(&action_id) {
434 if event_type.ends_with("Complete") {
435 break;
436 }
437 collected_events.push(event.clone());
438 }
439 }
440 }
441 })
442 .await;
443
444 if collection_result.is_err() {
445 return Err(AmiError::Timeout);
446 }
447
448 let mut final_fields = initial_response.fields;
449 final_fields.insert(
450 "CollectedEvents".to_string(),
451 serde_json::to_value(&collected_events)?,
452 );
453
454 Ok(AmiResponse {
455 response: initial_response.response,
456 action_id: initial_response.action_id,
457 message: Some("Successfully collected events.".to_string()),
458 fields: final_fields,
459 })
460 } else {
461 Ok(initial_response)
462 }
463 }
464
465 async fn send_initial_request(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
466 let action_id = get_or_set_action_id(&mut action);
467 let (tx, rx) = oneshot::channel();
468 let action_str = serialize_ami_action(&action)?;
469
470 {
471 let mut inner = self.inner.lock().await;
472 if inner.write_tx.is_none() {
473 return Err(AmiError::NotConnected);
474 }
475 inner.pending_responses.insert(action_id.clone(), tx);
476 let writer = inner.write_tx.as_ref().unwrap();
477 if writer.send(action_str).await.is_err() {
478 inner.pending_responses.remove(&action_id);
479 return Err(AmiError::ConnectionClosed);
480 }
481 }
482
483 match timeout(Duration::from_secs(10), rx).await {
484 Ok(Ok(Ok(resp))) => Ok(resp),
485 Ok(Ok(Err(e))) => Err(e),
486 Ok(Err(_)) => Err(AmiError::ChannelError("Responder dropped".to_string())),
487 Err(_) => Err(AmiError::Timeout),
488 }
489 }
490
491 pub async fn disconnect(&self) -> Result<(), AmiError> {
492 let mut inner = self.inner.lock().await;
493 inner.write_tx = None;
494 inner.authenticated = false;
495
496 if let Some(token) = &inner.heartbeat_token {
498 token.cancel();
499 inner.heartbeat_token = None;
500 }
501 if let Some(token) = &inner.watchdog_token {
502 token.cancel();
503 inner.watchdog_token = None;
504 }
505
506 Ok(())
507 }
508
509 pub async fn is_authenticated(&self) -> bool {
510 self.inner.lock().await.authenticated
511 }
512
513 pub async fn all_events_stream(
514 &self,
515 ) -> impl Stream<Item = Result<AmiEvent, BroadcastStreamRecvError>> + Send + Unpin {
516 let inner = self.inner.lock().await;
517 BroadcastStream::new(inner.event_broadcaster.subscribe())
518 }
519
520 pub async fn start_heartbeat(&self) -> Result<(), AmiError> {
522 self.start_heartbeat_with_interval(30).await
523 }
524
525 pub async fn start_heartbeat_with_interval(&self, interval_secs: u64) -> Result<(), AmiError> {
527 let mut inner = self.inner.lock().await;
528 let instance_id = inner.instance_id.clone();
529
530 if let Some(token) = &inner.heartbeat_token {
532 log::debug!("[{instance_id}] Cancelling existing heartbeat task");
533 token.cancel();
534 }
535
536 let token = CancellationToken::new();
537 inner.heartbeat_token = Some(token.clone());
538
539 log::debug!("[{instance_id}] Starting heartbeat task (interval={interval_secs}s)");
540
541 let manager = self.clone();
542 tokio::spawn(async move {
543 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
544 log::debug!("[{instance_id}] Heartbeat task started");
545 loop {
546 tokio::select! {
547 _ = token.cancelled() => {
548 log::debug!("[{instance_id}] Heartbeat task cancelled");
549 break;
550 }
551 _ = interval.tick() => {
552 if manager.is_authenticated().await {
553 match manager.send_action(AmiAction::Ping { action_id: None }).await {
554 Ok(_) => {
555 log::debug!("[{instance_id}] Heartbeat ping successful");
556 }
557 Err(e) => {
558 log::warn!("[{instance_id}] Heartbeat ping failed: {e}");
559 if let Ok(inner) = manager.inner.try_lock() {
561 let _ = inner.event_broadcaster.send(AmiEvent::InternalConnectionLost {
562 error: format!("Heartbeat failed: {e}"),
563 });
564 }
565 let _ = manager.disconnect().await;
567 break;
568 }
569 }
570 } else {
571 log::trace!("[{instance_id}] Heartbeat tick: not authenticated, skipping ping");
572 }
573 }
574 }
575 }
576 });
577
578 Ok(())
579 }
580
581 pub async fn start_watchdog(&self, options: ManagerOptions) -> Result<(), AmiError> {
582 let instance_id = self.inner.lock().await.instance_id.clone();
583 log::debug!(
584 "[{}] Starting watchdog (default interval=1s) for user '{}' at {}:{}",
585 instance_id,
586 options.username,
587 options.host,
588 options.port
589 );
590 self.start_watchdog_with_interval(options, 1).await
591 }
592
593 pub async fn start_watchdog_with_interval(
594 &self,
595 options: ManagerOptions,
596 interval_secs: u64,
597 ) -> Result<(), AmiError> {
598 let mut inner = self.inner.lock().await;
599 let instance_id = inner.instance_id.clone();
600
601 if let Some(token) = &inner.watchdog_token {
603 log::debug!(
604 "[{instance_id}] Cancelling existing watchdog task before starting a new one"
605 );
606 token.cancel();
607 }
608
609 let token = CancellationToken::new();
610 inner.watchdog_token = Some(token.clone());
611
612 log::debug!(
613 "[{}] Spawning watchdog task (interval={}s) for user '{}' at {}:{}",
614 instance_id,
615 interval_secs,
616 options.username,
617 options.host,
618 options.port
619 );
620
621 let manager = self.clone();
622 tokio::spawn(async move {
623 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
624 log::debug!(
625 "[{}] Watchdog task started (interval={}s) for '{}'@{}:{}",
626 instance_id,
627 interval_secs,
628 options.username,
629 options.host,
630 options.port
631 );
632 loop {
633 tokio::select! {
634 _ = token.cancelled() => {
635 log::debug!("[{instance_id}] Watchdog task cancelled by token");
636 break;
637 }
638 _ = interval.tick() => {
639 if !manager.is_authenticated().await {
640 log::debug!(
641 "[{}] Watchdog attempting reconnection to '{}'@{}:{}...",
642 instance_id,
643 options.username, options.host, options.port
644 );
645 let mut mgr = manager.clone();
646 match mgr.connect_and_login(options.clone()).await {
647 Ok(_) => {
648 log::info!(
649 "[{}] Watchdog reconnection successful to '{}'@{}:{}",
650 instance_id,
651 options.username, options.host, options.port
652 );
653 }
654 Err(e) => {
655 log::debug!(
656 "[{}] Watchdog reconnection to '{}'@{}:{} failed: {}",
657 instance_id,
658 options.username, options.host, options.port, e
659 );
660 }
661 }
662 } else {
663 log::trace!("[{instance_id}] Watchdog tick: already authenticated; no action taken");
664 }
665 }
666 }
667 }
668 });
669
670 Ok(())
671 }
672}
673
674fn spawn_writer_task(mut writer: OwnedWriteHalf, mut write_rx: mpsc::Receiver<String>) {
675 tokio::spawn(async move {
676 while let Some(action_str) = write_rx.recv().await {
677 if writer.write_all(action_str.as_bytes()).await.is_err() {
678 break;
679 }
680 }
681 });
682}
683
684fn spawn_reader_task(
685 reader: OwnedReadHalf,
686 dispatch_tx: mpsc::Sender<String>,
687 event_broadcaster: broadcast::Sender<AmiEvent>,
688) {
689 tokio::spawn(async move {
690 let mut buf_reader = BufReader::new(reader);
691 loop {
692 let mut message_block = String::new();
693 loop {
694 let mut line = String::new();
695 match buf_reader.read_line(&mut line).await {
696 Ok(0) | Err(_) => {
697 let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
699 error: "Connection lost during read".to_string(),
700 });
701 return;
702 }
703 Ok(_) => {
704 let is_end = line == "\r\n";
705 message_block.push_str(&line);
706 if is_end {
707 break;
708 }
709 }
710 }
711 }
712
713 if !message_block.trim().is_empty() && dispatch_tx.send(message_block).await.is_err() {
714 let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
715 error: "Dispatcher channel closed".to_string(),
716 });
717 break;
718 }
719 }
720 });
721}
722
723fn spawn_dispatcher_task(
724 inner_arc: Arc<Mutex<InnerManager>>,
725 mut dispatch_rx: mpsc::Receiver<String>,
726) {
727 tokio::spawn(async move {
728 while let Some(raw_message) = dispatch_rx.recv().await {
729 if let Ok(parsed_messages) = parse_ami_protocol_message(&raw_message) {
730 for value_msg in parsed_messages {
731 let mut inner = inner_arc.lock().await;
732 if value_msg.get("Response").is_some() {
733 if let Ok(resp) = serde_json::from_value::<AmiResponse>(value_msg) {
734 if let Some(action_id) = &resp.action_id {
735 if let Some(responder) = inner.pending_responses.remove(action_id) {
736 let _ = responder.send(Ok(resp));
737 }
738 }
739 }
740 } else if value_msg.get("Event").is_some() {
741 if let Ok(event) = serde_json::from_value::<AmiEvent>(value_msg.clone()) {
742 let _ = inner.event_broadcaster.send(event);
743 }
744 }
745 }
746 }
747 }
748 });
749}
750
751fn parse_ami_protocol_message(raw_data: &str) -> Result<Vec<serde_json::Value>, AmiError> {
752 let mut messages = Vec::new();
753 for block in raw_data.trim().split("\r\n\r\n") {
754 if block.is_empty() {
755 continue;
756 }
757 let mut map = serde_json::Map::new();
758 for line in block.lines() {
759 if let Some((key, value)) = line.split_once(": ") {
760 map.insert(
761 key.trim().to_string(),
762 serde_json::Value::String(value.trim().to_string()),
763 );
764 }
765 }
766 if !map.is_empty() {
767 messages.push(serde_json::Value::Object(map));
768 }
769 }
770 Ok(messages)
771}
772
773fn serialize_ami_action(action: &AmiAction) -> Result<String, AmiError> {
774 let mut s = String::new();
775 match action {
776 AmiAction::Login {
777 username,
778 secret,
779 events,
780 action_id,
781 } => {
782 s.push_str("Action: Login\r\n");
783 s.push_str(&format!("Username: {username}\r\n"));
784 s.push_str(&format!("Secret: {secret}\r\n"));
785 if let Some(ev) = events {
786 s.push_str(&format!("Events: {ev}\r\n"));
787 }
788 if let Some(id) = action_id {
789 s.push_str(&format!("ActionID: {id}\r\n"));
790 }
791 }
792 AmiAction::Logoff { action_id } => {
793 s.push_str("Action: Logoff\r\n");
794 if let Some(id) = action_id {
795 s.push_str(&format!("ActionID: {id}\r\n"));
796 }
797 }
798 AmiAction::Ping { action_id } => {
799 s.push_str("Action: Ping\r\n");
800 if let Some(id) = action_id {
801 s.push_str(&format!("ActionID: {id}\r\n"));
802 }
803 }
804 AmiAction::Command { command, action_id } => {
805 s.push_str("Action: Command\r\n");
806 s.push_str(&format!("Command: {command}\r\n"));
807 if let Some(id) = action_id {
808 s.push_str(&format!("ActionID: {id}\r\n"));
809 }
810 }
811 AmiAction::Custom {
812 action: action_name,
813 params,
814 action_id,
815 } => {
816 s.push_str(&format!("Action: {action_name}\r\n"));
817 for (k, v) in params {
818 s.push_str(&format!("{k}: {v}\r\n"));
819 }
820 if let Some(id) = action_id {
821 s.push_str(&format!("ActionID: {id}\r\n"));
822 }
823 }
824 }
825 s.push_str("\r\n");
826 Ok(s)
827}
828
829fn get_or_set_action_id(action: &mut AmiAction) -> String {
830 match action {
831 AmiAction::Login { action_id, .. }
832 | AmiAction::Logoff { action_id }
833 | AmiAction::Ping { action_id }
834 | AmiAction::Command { action_id, .. }
835 | AmiAction::Custom { action_id, .. } => {
836 if let Some(id) = action_id {
837 id.clone()
838 } else {
839 let new_id = Uuid::new_v4().to_string();
840 *action_id = Some(new_id.clone());
841 new_id
842 }
843 }
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850 use tokio_stream::StreamExt;
851
852 #[test]
853 fn test_serialize_login_action() {
854 let action = AmiAction::Login {
855 username: "user".to_string(),
856 secret: "pass".to_string(),
857 events: Some("on".to_string()),
858 action_id: Some("abc123".to_string()),
859 };
860 let s = serialize_ami_action(&action).unwrap();
861 assert!(s.contains("Action: Login"));
862 assert!(s.contains("Username: user"));
863 assert!(s.contains("Secret: pass"));
864 assert!(s.contains("Events: on"));
865 assert!(s.contains("ActionID: abc123"));
866 assert!(s.ends_with("\r\n\r\n"));
867 }
868
869 #[test]
870 fn test_serialize_command_action() {
871 let action = AmiAction::Command {
872 command: "sip show peers".to_string(),
873 action_id: None,
874 };
875 let s = serialize_ami_action(&action).unwrap();
876 assert!(s.contains("Action: Command"));
877 assert!(s.contains("Command: sip show peers"));
878 }
879
880 #[test]
881 fn test_parse_ami_protocol_message() {
882 let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
883 let parsed = parse_ami_protocol_message(raw).unwrap();
884 assert_eq!(parsed.len(), 1);
885 let obj = &parsed[0];
886 assert_eq!(obj["Response"], "Success");
887 assert_eq!(obj["ActionID"], "123");
888 assert_eq!(obj["Message"], "Authentication accepted");
889 }
890
891 #[test]
892 fn test_deserialize_ami_response() {
893 let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
894 let parsed = parse_ami_protocol_message(raw).unwrap();
895 let resp: AmiResponse = serde_json::from_value(parsed[0].clone()).unwrap();
896 assert_eq!(resp.response, "Success");
897 assert_eq!(resp.action_id.as_deref(), Some("123"));
898 assert_eq!(resp.message.as_deref(), Some("Authentication accepted"));
899 }
900
901 #[test]
902 fn test_deserialize_newchannel_event() {
903 let raw = "Event: Newchannel\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nChannelState: 4\r\nChannelStateDesc: Ring\r\nCallerIDNum: 100\r\nCallerIDName: Alice\r\n\r\n";
904 let parsed = parse_ami_protocol_message(raw).unwrap();
905 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
906 match event {
907 AmiEvent::Newchannel(data) => {
908 assert_eq!(data.channel, "SIP/100-00000001");
909 assert_eq!(data.uniqueid, "1234");
910 assert_eq!(data.channel_state.as_deref(), Some("4"));
911 assert_eq!(data.channel_state_desc.as_deref(), Some("Ring"));
912 assert_eq!(data.caller_id_num.as_deref(), Some("100"));
913 assert_eq!(data.caller_id_name.as_deref(), Some("Alice"));
914 }
915 _ => panic!("Expected AmiEvent::Newchannel"),
916 }
917 }
918
919 #[test]
920 fn test_deserialize_hangup_event() {
921 let raw = "Event: Hangup\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nCause: 16\r\nCause-txt: Normal Clearing\r\n\r\n";
922 let parsed = parse_ami_protocol_message(raw).unwrap();
923 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
924 match event {
925 AmiEvent::Hangup(data) => {
926 assert_eq!(data.channel, "SIP/100-00000001");
927 assert_eq!(data.uniqueid, "1234");
928 assert_eq!(data.cause.as_deref(), Some("16"));
929 assert_eq!(data.cause_txt.as_deref(), Some("Normal Clearing"));
930 }
931 _ => panic!("Expected AmiEvent::Hangup"),
932 }
933 }
934
935 #[test]
936 fn test_deserialize_peerstatus_event() {
937 let raw = "Event: PeerStatus\r\nPeer: SIP/100\r\nPeerStatus: Registered\r\n\r\n";
938 let parsed = parse_ami_protocol_message(raw).unwrap();
939 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
940 match event {
941 AmiEvent::PeerStatus(data) => {
942 assert_eq!(data.peer, "SIP/100");
943 assert_eq!(data.peer_status, "Registered");
944 }
945 _ => panic!("Expected AmiEvent::PeerStatus"),
946 }
947 }
948
949 #[test]
950 fn test_deserialize_unknown_event() {
951 let raw = "Event: FooBar\r\nSomeField: Value\r\n\r\n";
952 let parsed = parse_ami_protocol_message(raw).unwrap();
953 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
954 match event {
955 AmiEvent::UnknownEvent { event_type, fields } => {
956 assert_eq!(event_type, "FooBar");
957 assert_eq!(fields.get("SomeField").map(|s| s.as_str()), Some("Value"));
958 }
959 _ => panic!("Expected AmiEvent::UnknownEvent"),
960 }
961 }
962
963 #[tokio::test]
964 async fn test_manager_options_clone() {
965 let opts = ManagerOptions {
966 port: 5038,
967 host: "localhost".to_string(),
968 username: "admin".to_string(),
969 password: "pwd".to_string(),
970 events: true,
971 };
972 let opts2 = opts.clone();
973 assert_eq!(opts.port, opts2.port);
974 assert_eq!(opts.host, opts2.host);
975 assert_eq!(opts.username, opts2.username);
976 assert_eq!(opts.password, opts2.password);
977 assert_eq!(opts.events, opts2.events);
978 }
979
980 #[tokio::test]
981 async fn test_manager_new_and_auth_flag() {
982 let manager = Manager::new(); assert!(!manager.is_authenticated().await);
985 }
986
987 #[tokio::test]
988 async fn test_event_internal_connection_lost() {
989 let manager = Manager::new();
991
992 let mut stream = manager.all_events_stream().await;
994
995 {
998 let inner = manager.inner.lock().await;
999 let _ = inner
1000 .event_broadcaster
1001 .send(AmiEvent::InternalConnectionLost {
1002 error: "simulated".to_string(),
1003 });
1004 }
1005
1006 let ev = stream.next().await.unwrap().unwrap();
1008 match ev {
1009 AmiEvent::InternalConnectionLost { error } => {
1010 assert_eq!(error, "simulated");
1011 }
1012 _ => panic!("Expected InternalConnectionLost"),
1013 }
1014 }
1015
1016 #[tokio::test]
1017 async fn test_manager_options_default() {
1018 let opts = ManagerOptions {
1019 port: 5038,
1020 host: "localhost".to_string(),
1021 username: "admin".to_string(),
1022 password: "pwd".to_string(),
1023 events: true,
1024 };
1025 assert!(opts.events);
1026 }
1027
1028 #[tokio::test]
1029 async fn test_manager_new_with_buffer() {
1030 let manager = Manager::new_with_buffer(512);
1031 assert!(!manager.is_authenticated().await);
1032
1033 let _stream = manager.all_events_stream().await;
1035 }
1036
1037 #[tokio::test]
1038 async fn test_heartbeat_and_watchdog_tokens() {
1039 let manager = Manager::new();
1040
1041 {
1043 let inner = manager.inner.lock().await;
1044 assert!(inner.heartbeat_token.is_none());
1045 assert!(inner.watchdog_token.is_none());
1046 }
1047
1048 let opts = ManagerOptions {
1050 port: 5038,
1051 host: "127.0.0.1".to_string(),
1052 username: "test".to_string(),
1053 password: "test".to_string(),
1054 events: true,
1055 };
1056
1057 let _ = manager.start_heartbeat().await;
1059 {
1060 let inner = manager.inner.lock().await;
1061 assert!(inner.heartbeat_token.is_some());
1062 }
1063
1064 let _ = manager.start_watchdog(opts).await;
1066 {
1067 let inner = manager.inner.lock().await;
1068 assert!(inner.watchdog_token.is_some());
1069 }
1070
1071 let _ = manager.disconnect().await;
1073 {
1074 let inner = manager.inner.lock().await;
1075 assert!(inner.heartbeat_token.is_none());
1076 assert!(inner.watchdog_token.is_none());
1077 }
1078 }
1079
1080 #[tokio::test]
1081 async fn test_connection_lost_event_emission() {
1082 let manager = Manager::new();
1084 let mut stream = manager.all_events_stream().await;
1085
1086 {
1088 let inner = manager.inner.lock().await;
1089 let _ = inner
1090 .event_broadcaster
1091 .send(AmiEvent::InternalConnectionLost {
1092 error: "test connection lost".to_string(),
1093 });
1094 }
1095
1096 let event = stream.next().await.unwrap().unwrap();
1098 match event {
1099 AmiEvent::InternalConnectionLost { error } => {
1100 assert_eq!(error, "test connection lost");
1101 }
1102 _ => panic!("Expected InternalConnectionLost event"),
1103 }
1104 }
1105
1106 #[tokio::test]
1107 async fn test_heartbeat_interval_respected() {
1108 tokio::time::pause();
1110
1111 let manager = Manager::new();
1112
1113 let _ = manager.start_heartbeat_with_interval(2).await;
1115
1116 tokio::time::advance(Duration::from_secs(1)).await;
1118 {
1119 let inner = manager.inner.lock().await;
1120 assert!(inner.heartbeat_token.is_some());
1122 }
1123
1124 tokio::time::advance(Duration::from_secs(2)).await;
1126
1127 {
1131 let inner = manager.inner.lock().await;
1132 assert!(inner.heartbeat_token.is_some());
1133 }
1134
1135 let _ = manager.disconnect().await;
1137 }
1138
1139 #[tokio::test]
1140 async fn test_watchdog_interval_configuration() {
1141 let manager = Manager::new();
1143
1144 let opts = ManagerOptions {
1145 port: 5038,
1146 host: "127.0.0.1".to_string(),
1147 username: "test".to_string(),
1148 password: "test".to_string(),
1149 events: true,
1150 };
1151
1152 let _ = manager.start_watchdog(opts.clone()).await;
1154 {
1155 let inner = manager.inner.lock().await;
1156 assert!(inner.watchdog_token.is_some());
1157 }
1158
1159 let _ = manager.start_watchdog_with_interval(opts.clone(), 5).await;
1161 {
1162 let inner = manager.inner.lock().await;
1163 assert!(inner.watchdog_token.is_some());
1164 }
1165
1166 let _ = manager.disconnect().await;
1168 }
1169
1170 #[test]
1171 fn test_unknown_event_serialization_roundtrip() {
1172 let mut fields = HashMap::new();
1174 fields.insert("Event".to_string(), "ContactStatus".to_string());
1175 fields.insert("AOR".to_string(), "1000021005".to_string());
1176 fields.insert("ContactStatus".to_string(), "Removed".to_string());
1177
1178 let original = AmiEvent::UnknownEvent {
1179 event_type: "ContactStatus".to_string(),
1180 fields: fields.clone(),
1181 };
1182
1183 let json = serde_json::to_string(&original).unwrap();
1185
1186 let deserialized: AmiEvent = serde_json::from_str(&json).unwrap();
1188
1189 match deserialized {
1191 AmiEvent::UnknownEvent { event_type, fields: deserialized_fields } => {
1192 assert_eq!(event_type, "ContactStatus", "Event type should be preserved");
1193 assert_eq!(
1194 deserialized_fields.get("AOR").map(|s| s.as_str()),
1195 Some("1000021005"),
1196 "Fields should be preserved"
1197 );
1198 assert_eq!(
1199 deserialized_fields.get("ContactStatus").map(|s| s.as_str()),
1200 Some("Removed"),
1201 "Fields should be preserved"
1202 );
1203 }
1204 _ => panic!("Expected AmiEvent::UnknownEvent after deserialization, got {:?}", deserialized),
1205 }
1206 }
1207
1208 #[test]
1209 fn test_unknown_event_kafka_scenario() {
1210 let mut fields = HashMap::new();
1215 fields.insert("Event".to_string(), "ContactStatus".to_string());
1216 fields.insert("AOR".to_string(), "1000021005".to_string());
1217 fields.insert("ContactStatus".to_string(), "Removed".to_string());
1218 fields.insert("URI".to_string(), "sip:1000021005@10.0.0.1:5060".to_string());
1219
1220 let original = AmiEvent::UnknownEvent {
1221 event_type: "ContactStatus".to_string(),
1222 fields: fields.clone(),
1223 };
1224
1225 let json = serde_json::to_string(&original).unwrap();
1227
1228 let deserialized: AmiEvent = serde_json::from_str(&json).unwrap();
1230
1231 match deserialized {
1233 AmiEvent::UnknownEvent { event_type, fields: deserialized_fields } => {
1234 assert_eq!(event_type, "ContactStatus");
1235 assert_eq!(deserialized_fields.get("AOR"), Some(&"1000021005".to_string()));
1236 assert_eq!(deserialized_fields.get("ContactStatus"), Some(&"Removed".to_string()));
1237 assert_eq!(deserialized_fields.get("URI"), Some(&"sip:1000021005@10.0.0.1:5060".to_string()));
1238 assert_eq!(deserialized_fields.get("Event"), Some(&"ContactStatus".to_string()));
1240 }
1241 _ => panic!("Expected UnknownEvent with ContactStatus, got {:?}", deserialized),
1242 }
1243 }
1244}