1use serde::de::Deserializer;
96use serde::{Deserialize, Serialize};
97use serde_json::Value;
98use std::collections::HashMap;
99use std::sync::Arc;
100use thiserror::Error;
101use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
102use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
103use tokio::net::TcpStream;
104use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
105use tokio::time::{timeout, Duration};
106use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
107use tokio_stream::wrappers::BroadcastStream;
108use tokio_stream::Stream;
109use tokio_util::sync::CancellationToken;
110#[cfg(feature = "docs")]
111use utoipa::ToSchema;
112use uuid::Uuid;
113
114pub mod resilient;
115
116#[cfg_attr(feature = "docs", derive(ToSchema))]
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct AmiResponse {
119 #[serde(rename = "Response")]
120 pub response: String,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 #[serde(rename = "ActionID")]
123 pub action_id: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 #[serde(rename = "Message")]
126 pub message: Option<String>,
127 #[serde(flatten)]
128 #[cfg_attr(feature = "docs", schema(additional_properties = true))]
129 pub fields: HashMap<String, Value>,
130}
131
132#[cfg_attr(feature = "docs", derive(ToSchema))]
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(tag = "Action", rename_all = "PascalCase")]
135pub enum AmiAction {
136 Login {
137 username: String,
138 secret: String,
139 #[serde(rename = "Events")]
140 events: Option<String>,
141 #[serde(rename = "ActionID")]
142 action_id: Option<String>,
143 },
144 Logoff {
145 #[serde(rename = "ActionID")]
146 action_id: Option<String>,
147 },
148 Ping {
149 #[serde(rename = "ActionID")]
150 action_id: Option<String>,
151 },
152 Command {
153 command: String,
154 #[serde(rename = "ActionID")]
155 action_id: Option<String>,
156 },
157 Custom {
158 action: String,
159 #[serde(flatten)]
160 params: HashMap<String, String>,
161 #[serde(rename = "ActionID")]
162 action_id: Option<String>,
163 },
164}
165
166#[cfg_attr(feature = "docs", derive(ToSchema))]
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct NewchannelEventData {
169 #[serde(rename = "Channel")]
170 pub channel: String,
171 #[serde(rename = "Uniqueid")]
172 pub uniqueid: String,
173 #[serde(rename = "ChannelState")]
174 pub channel_state: Option<String>,
175 #[serde(rename = "ChannelStateDesc")]
176 pub channel_state_desc: Option<String>,
177 #[serde(rename = "CallerIDNum")]
178 pub caller_id_num: Option<String>,
179 #[serde(rename = "CallerIDName")]
180 pub caller_id_name: Option<String>,
181 #[serde(flatten)]
182 pub other: HashMap<String, String>,
183}
184
185#[cfg_attr(feature = "docs", derive(ToSchema))]
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct HangupEventData {
188 #[serde(rename = "Channel")]
189 pub channel: String,
190 #[serde(rename = "Uniqueid")]
191 pub uniqueid: String,
192 #[serde(rename = "Cause")]
193 pub cause: Option<String>,
194 #[serde(rename = "Cause-txt")]
195 pub cause_txt: Option<String>,
196 #[serde(flatten)]
197 pub other: HashMap<String, String>,
198}
199
200#[cfg_attr(feature = "docs", derive(ToSchema))]
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PeerStatusEventData {
203 #[serde(rename = "Peer")]
204 pub peer: String,
205 #[serde(rename = "PeerStatus")]
206 pub peer_status: String,
207 #[serde(flatten)]
208 pub other: HashMap<String, String>,
209}
210
211#[cfg_attr(feature = "docs", derive(ToSchema))]
212#[derive(Debug, Clone, Serialize)]
213#[serde(untagged)]
214pub enum AmiEvent {
215 Newchannel(NewchannelEventData),
216 Hangup(HangupEventData),
217 PeerStatus(PeerStatusEventData),
218 UnknownEvent {
219 event_type: String,
220 fields: HashMap<String, String>,
221 },
222 InternalConnectionLost {
223 error: String,
224 },
225}
226
227impl<'de> Deserialize<'de> for AmiEvent {
228 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
229 where
230 D: Deserializer<'de>,
231 {
232 let value = serde_json::Value::deserialize(deserializer)?;
233 let map_obj = value
234 .as_object()
235 .ok_or_else(|| serde::de::Error::custom("AmiEvent: Expected a JSON object/map"))?;
236
237 if let Some(event_type_val) = map_obj.get("Event") {
238 let event_type_str = event_type_val.as_str().ok_or_else(|| {
239 serde::de::Error::custom("AmiEvent: 'Event' field is not a string")
240 })?;
241
242 match event_type_str {
243 "Newchannel" => Ok(AmiEvent::Newchannel(
244 NewchannelEventData::deserialize(value.clone())
245 .map_err(serde::de::Error::custom)?,
246 )),
247 "Hangup" => Ok(AmiEvent::Hangup(
248 HangupEventData::deserialize(value.clone())
249 .map_err(serde::de::Error::custom)?,
250 )),
251 "PeerStatus" => Ok(AmiEvent::PeerStatus(
252 PeerStatusEventData::deserialize(value.clone())
253 .map_err(serde::de::Error::custom)?,
254 )),
255 _ => {
256 let fields: HashMap<String, String> = map_obj
257 .iter()
258 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
259 .collect();
260 Ok(AmiEvent::UnknownEvent {
261 event_type: event_type_str.to_string(),
262 fields,
263 })
264 }
265 }
266 } else {
267 let fields: HashMap<String, String> = map_obj
268 .iter()
269 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
270 .collect();
271 Ok(AmiEvent::UnknownEvent {
272 event_type: "UnknownOrMalformed".to_string(),
273 fields,
274 })
275 }
276 }
277}
278
279#[derive(Debug, Error)]
280pub enum AmiError {
281 #[error("IO error: {0}")]
282 Io(#[from] std::io::Error),
283 #[error("Parse error: {0}")]
284 ParseError(String),
285 #[error("Serialize error: {0}")]
286 SerializeError(String),
287 #[error("JSON error: {0}")]
288 SerdeJson(#[from] serde_json::Error),
289 #[error("Authentication failed: {0}")]
290 AuthenticationFailed(String),
291 #[error("Action failed: {response:?}")]
292 ActionFailed { response: AmiResponse },
293 #[error("Connection closed")]
294 ConnectionClosed,
295 #[error("Operation timed out")]
296 Timeout,
297 #[error("Login required")]
298 LoginRequired,
299 #[error("Internal channel error: {0}")]
300 ChannelError(String),
301 #[error("Event stream lagged: {0}")]
302 EventStreamLagged(#[from] tokio::sync::broadcast::error::RecvError),
303 #[error("Not connected to AMI server")]
304 NotConnected,
305 #[error("Other error: {0}")]
306 Other(String),
307}
308
309#[derive(Serialize, Deserialize, Debug, Clone)]
310pub struct ManagerOptions {
311 pub port: u16,
312 pub host: String,
313 pub username: String,
314 pub password: String,
315 pub events: bool,
316}
317
318struct InnerManager {
319 authenticated: bool,
320 write_tx: Option<mpsc::Sender<String>>,
322 event_broadcaster: broadcast::Sender<AmiEvent>,
324 pending_responses: HashMap<String, oneshot::Sender<Result<AmiResponse, AmiError>>>,
326 heartbeat_token: Option<CancellationToken>,
328 watchdog_token: Option<CancellationToken>,
330}
331
332#[derive(Clone)]
333pub struct Manager {
334 pub(crate) inner: Arc<Mutex<InnerManager>>,
335}
336
337impl Default for Manager {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343impl Manager {
344 pub fn new() -> Self {
345 Self::new_with_buffer(1024)
346 }
347
348 pub fn new_with_buffer(buffer_size: usize) -> Self {
349 let (event_tx, _) = broadcast::channel(buffer_size);
350 let inner = InnerManager {
351 authenticated: false,
352 write_tx: None,
353 event_broadcaster: event_tx,
354 pending_responses: HashMap::new(),
355 heartbeat_token: None,
356 watchdog_token: None,
357 };
358 Self {
359 inner: Arc::new(Mutex::new(inner)),
360 }
361 }
362
363 pub async fn connect_and_login(&mut self, options: ManagerOptions) -> Result<(), AmiError> {
364 let stream = timeout(
365 Duration::from_secs(10),
366 TcpStream::connect((options.host.as_str(), options.port)),
367 )
368 .await
369 .map_err(|_| AmiError::Timeout)?
370 .map_err(AmiError::Io)?;
371
372 let (reader, writer) = stream.into_split();
373
374 let (write_tx, write_rx) = mpsc::channel::<String>(100);
375 let (dispatch_tx, dispatch_rx) = mpsc::channel::<String>(1024);
376
377 let event_broadcaster = {
378 let inner = self.inner.lock().await;
379 inner.event_broadcaster.clone()
380 };
381
382 spawn_writer_task(writer, write_rx);
383 spawn_reader_task(reader, dispatch_tx, event_broadcaster);
384 spawn_dispatcher_task(self.inner.clone(), dispatch_rx);
385
386 self.inner.lock().await.write_tx = Some(write_tx);
387
388 let login_action = AmiAction::Login {
389 username: options.username.clone(),
390 secret: options.password.clone(),
391 events: Some("on".to_string()),
392 action_id: Some("rust-ami-login".to_string()),
393 };
394
395 match self.send_action(login_action).await {
396 Ok(resp) if resp.response.eq_ignore_ascii_case("Success") => {
397 self.inner.lock().await.authenticated = true;
398 Ok(())
399 }
400 Ok(resp) => Err(AmiError::AuthenticationFailed(
401 resp.message.unwrap_or_default(),
402 )),
403 Err(e) => Err(e),
404 }
405 }
406
407 pub async fn send_action(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
408 let action_id = get_or_set_action_id(&mut action);
409
410 let mut stream = self.all_events_stream().await;
411
412 let initial_response = self.send_initial_request(action.clone()).await?;
413
414 if initial_response
415 .fields
416 .get("EventList")
417 .and_then(|v| v.as_str())
418 == Some("start")
419 {
420 let mut collected_events = Vec::new();
421
422 let collection_result = tokio::time::timeout(Duration::from_secs(10), async {
423 use tokio_stream::StreamExt;
424 while let Some(Ok(event)) = stream.next().await {
425 if let AmiEvent::UnknownEvent { event_type, fields } = &event {
426 if fields.get("ActionID").map(|id| id.as_str()) == Some(&action_id) {
427 if event_type.ends_with("Complete") {
428 break;
429 }
430 collected_events.push(event.clone());
431 }
432 }
433 }
434 })
435 .await;
436
437 if collection_result.is_err() {
438 return Err(AmiError::Timeout);
439 }
440
441 let mut final_fields = initial_response.fields;
442 final_fields.insert(
443 "CollectedEvents".to_string(),
444 serde_json::to_value(&collected_events)?,
445 );
446
447 Ok(AmiResponse {
448 response: initial_response.response,
449 action_id: initial_response.action_id,
450 message: Some("Successfully collected events.".to_string()),
451 fields: final_fields,
452 })
453 } else {
454 Ok(initial_response)
455 }
456 }
457
458 async fn send_initial_request(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
459 let action_id = get_or_set_action_id(&mut action);
460 let (tx, rx) = oneshot::channel();
461 let action_str = serialize_ami_action(&action)?;
462
463 {
464 let mut inner = self.inner.lock().await;
465 if inner.write_tx.is_none() {
466 return Err(AmiError::NotConnected);
467 }
468 inner.pending_responses.insert(action_id.clone(), tx);
469 let writer = inner.write_tx.as_ref().unwrap();
470 if writer.send(action_str).await.is_err() {
471 inner.pending_responses.remove(&action_id);
472 return Err(AmiError::ConnectionClosed);
473 }
474 }
475
476 match timeout(Duration::from_secs(10), rx).await {
477 Ok(Ok(Ok(resp))) => Ok(resp),
478 Ok(Ok(Err(e))) => Err(e),
479 Ok(Err(_)) => Err(AmiError::ChannelError("Responder dropped".to_string())),
480 Err(_) => Err(AmiError::Timeout),
481 }
482 }
483
484 pub async fn disconnect(&self) -> Result<(), AmiError> {
485 let mut inner = self.inner.lock().await;
486 inner.write_tx = None;
487 inner.authenticated = false;
488
489 if let Some(token) = &inner.heartbeat_token {
491 token.cancel();
492 inner.heartbeat_token = None;
493 }
494 if let Some(token) = &inner.watchdog_token {
495 token.cancel();
496 inner.watchdog_token = None;
497 }
498
499 Ok(())
500 }
501
502 pub async fn is_authenticated(&self) -> bool {
503 self.inner.lock().await.authenticated
504 }
505
506 pub async fn all_events_stream(
507 &self,
508 ) -> impl Stream<Item = Result<AmiEvent, BroadcastStreamRecvError>> + Send + Unpin {
509 let inner = self.inner.lock().await;
510 BroadcastStream::new(inner.event_broadcaster.subscribe())
511 }
512
513 pub async fn start_heartbeat(&self) -> Result<(), AmiError> {
515 self.start_heartbeat_with_interval(30).await
516 }
517
518 pub async fn start_heartbeat_with_interval(&self, interval_secs: u64) -> Result<(), AmiError> {
520 let mut inner = self.inner.lock().await;
521
522 if let Some(token) = &inner.heartbeat_token {
524 token.cancel();
525 }
526
527 let token = CancellationToken::new();
528 inner.heartbeat_token = Some(token.clone());
529
530 let manager = self.clone();
531 tokio::spawn(async move {
532 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
533 loop {
534 tokio::select! {
535 _ = token.cancelled() => {
536 break;
537 }
538 _ = interval.tick() => {
539 if manager.is_authenticated().await {
540 match manager.send_action(AmiAction::Ping { action_id: None }).await {
541 Ok(_) => {
542 log::debug!("Heartbeat ping successful");
543 }
544 Err(e) => {
545 log::warn!("Heartbeat ping failed: {}", e);
546 if let Ok(inner) = manager.inner.try_lock() {
548 let _ = inner.event_broadcaster.send(AmiEvent::InternalConnectionLost {
549 error: format!("Heartbeat failed: {}", e),
550 });
551 }
552 let _ = manager.disconnect().await;
554 break;
555 }
556 }
557 }
558 }
559 }
560 }
561 });
562
563 Ok(())
564 }
565
566 pub async fn start_watchdog(&self, options: ManagerOptions) -> Result<(), AmiError> {
567 self.start_watchdog_with_interval(options, 1).await
568 }
569
570 pub async fn start_watchdog_with_interval(
571 &self,
572 options: ManagerOptions,
573 interval_secs: u64,
574 ) -> Result<(), AmiError> {
575 let mut inner = self.inner.lock().await;
576
577 if let Some(token) = &inner.watchdog_token {
579 token.cancel();
580 }
581
582 let token = CancellationToken::new();
583 inner.watchdog_token = Some(token.clone());
584
585 let manager = self.clone();
586 tokio::spawn(async move {
587 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
588 loop {
589 tokio::select! {
590 _ = token.cancelled() => {
591 break;
592 }
593 _ = interval.tick() => {
594 if !manager.is_authenticated().await {
595 log::debug!("Watchdog attempting reconnection...");
596 let mut mgr = manager.clone();
597 match mgr.connect_and_login(options.clone()).await {
598 Ok(_) => {
599 log::info!("Watchdog reconnection successful");
600 }
601 Err(e) => {
602 log::debug!("Watchdog reconnection failed: {}", e);
603 }
604 }
605 }
606 }
607 }
608 }
609 });
610
611 Ok(())
612 }
613}
614
615fn spawn_writer_task(mut writer: OwnedWriteHalf, mut write_rx: mpsc::Receiver<String>) {
616 tokio::spawn(async move {
617 while let Some(action_str) = write_rx.recv().await {
618 if writer.write_all(action_str.as_bytes()).await.is_err() {
619 break;
620 }
621 }
622 });
623}
624
625fn spawn_reader_task(
626 reader: OwnedReadHalf,
627 dispatch_tx: mpsc::Sender<String>,
628 event_broadcaster: broadcast::Sender<AmiEvent>,
629) {
630 tokio::spawn(async move {
631 let mut buf_reader = BufReader::new(reader);
632 loop {
633 let mut message_block = String::new();
634 loop {
635 let mut line = String::new();
636 match buf_reader.read_line(&mut line).await {
637 Ok(0) | Err(_) => {
638 let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
640 error: "Connection lost during read".to_string(),
641 });
642 return;
643 }
644 Ok(_) => {
645 let is_end = line == "\r\n";
646 message_block.push_str(&line);
647 if is_end {
648 break;
649 }
650 }
651 }
652 }
653
654 if !message_block.trim().is_empty() && dispatch_tx.send(message_block).await.is_err() {
655 let _ = event_broadcaster.send(AmiEvent::InternalConnectionLost {
656 error: "Dispatcher channel closed".to_string(),
657 });
658 break;
659 }
660 }
661 });
662}
663
664fn spawn_dispatcher_task(
665 inner_arc: Arc<Mutex<InnerManager>>,
666 mut dispatch_rx: mpsc::Receiver<String>,
667) {
668 tokio::spawn(async move {
669 while let Some(raw_message) = dispatch_rx.recv().await {
670 if let Ok(parsed_messages) = parse_ami_protocol_message(&raw_message) {
671 for value_msg in parsed_messages {
672 let mut inner = inner_arc.lock().await;
673 if value_msg.get("Response").is_some() {
674 if let Ok(resp) = serde_json::from_value::<AmiResponse>(value_msg) {
675 if let Some(action_id) = &resp.action_id {
676 if let Some(responder) = inner.pending_responses.remove(action_id) {
677 let _ = responder.send(Ok(resp));
678 }
679 }
680 }
681 } else if value_msg.get("Event").is_some() {
682 if let Ok(event) = serde_json::from_value::<AmiEvent>(value_msg.clone()) {
683 let _ = inner.event_broadcaster.send(event);
684 }
685 }
686 }
687 }
688 }
689 });
690}
691
692fn parse_ami_protocol_message(raw_data: &str) -> Result<Vec<serde_json::Value>, AmiError> {
693 let mut messages = Vec::new();
694 for block in raw_data.trim().split("\r\n\r\n") {
695 if block.is_empty() {
696 continue;
697 }
698 let mut map = serde_json::Map::new();
699 for line in block.lines() {
700 if let Some((key, value)) = line.split_once(": ") {
701 map.insert(
702 key.trim().to_string(),
703 serde_json::Value::String(value.trim().to_string()),
704 );
705 }
706 }
707 if !map.is_empty() {
708 messages.push(serde_json::Value::Object(map));
709 }
710 }
711 Ok(messages)
712}
713
714fn serialize_ami_action(action: &AmiAction) -> Result<String, AmiError> {
715 let mut s = String::new();
716 match action {
717 AmiAction::Login {
718 username,
719 secret,
720 events,
721 action_id,
722 } => {
723 s.push_str("Action: Login\r\n");
724 s.push_str(&format!("Username: {}\r\n", username));
725 s.push_str(&format!("Secret: {}\r\n", secret));
726 if let Some(ev) = events {
727 s.push_str(&format!("Events: {}\r\n", ev));
728 }
729 if let Some(id) = action_id {
730 s.push_str(&format!("ActionID: {}\r\n", id));
731 }
732 }
733 AmiAction::Logoff { action_id } => {
734 s.push_str("Action: Logoff\r\n");
735 if let Some(id) = action_id {
736 s.push_str(&format!("ActionID: {}\r\n", id));
737 }
738 }
739 AmiAction::Ping { action_id } => {
740 s.push_str("Action: Ping\r\n");
741 if let Some(id) = action_id {
742 s.push_str(&format!("ActionID: {}\r\n", id));
743 }
744 }
745 AmiAction::Command { command, action_id } => {
746 s.push_str("Action: Command\r\n");
747 s.push_str(&format!("Command: {}\r\n", command));
748 if let Some(id) = action_id {
749 s.push_str(&format!("ActionID: {}\r\n", id));
750 }
751 }
752 AmiAction::Custom {
753 action: action_name,
754 params,
755 action_id,
756 } => {
757 s.push_str(&format!("Action: {}\r\n", action_name));
758 for (k, v) in params {
759 s.push_str(&format!("{}: {}\r\n", k, v));
760 }
761 if let Some(id) = action_id {
762 s.push_str(&format!("ActionID: {}\r\n", id));
763 }
764 }
765 }
766 s.push_str("\r\n");
767 Ok(s)
768}
769
770fn get_or_set_action_id(action: &mut AmiAction) -> String {
771 match action {
772 AmiAction::Login { action_id, .. }
773 | AmiAction::Logoff { action_id }
774 | AmiAction::Ping { action_id }
775 | AmiAction::Command { action_id, .. }
776 | AmiAction::Custom { action_id, .. } => {
777 if let Some(id) = action_id {
778 id.clone()
779 } else {
780 let new_id = Uuid::new_v4().to_string();
781 *action_id = Some(new_id.clone());
782 new_id
783 }
784 }
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791 use tokio_stream::StreamExt;
792
793 #[test]
794 fn test_serialize_login_action() {
795 let action = AmiAction::Login {
796 username: "user".to_string(),
797 secret: "pass".to_string(),
798 events: Some("on".to_string()),
799 action_id: Some("abc123".to_string()),
800 };
801 let s = serialize_ami_action(&action).unwrap();
802 assert!(s.contains("Action: Login"));
803 assert!(s.contains("Username: user"));
804 assert!(s.contains("Secret: pass"));
805 assert!(s.contains("Events: on"));
806 assert!(s.contains("ActionID: abc123"));
807 assert!(s.ends_with("\r\n\r\n"));
808 }
809
810 #[test]
811 fn test_serialize_command_action() {
812 let action = AmiAction::Command {
813 command: "sip show peers".to_string(),
814 action_id: None,
815 };
816 let s = serialize_ami_action(&action).unwrap();
817 assert!(s.contains("Action: Command"));
818 assert!(s.contains("Command: sip show peers"));
819 }
820
821 #[test]
822 fn test_parse_ami_protocol_message() {
823 let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
824 let parsed = parse_ami_protocol_message(raw).unwrap();
825 assert_eq!(parsed.len(), 1);
826 let obj = &parsed[0];
827 assert_eq!(obj["Response"], "Success");
828 assert_eq!(obj["ActionID"], "123");
829 assert_eq!(obj["Message"], "Authentication accepted");
830 }
831
832 #[test]
833 fn test_deserialize_ami_response() {
834 let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
835 let parsed = parse_ami_protocol_message(raw).unwrap();
836 let resp: AmiResponse = serde_json::from_value(parsed[0].clone()).unwrap();
837 assert_eq!(resp.response, "Success");
838 assert_eq!(resp.action_id.as_deref(), Some("123"));
839 assert_eq!(resp.message.as_deref(), Some("Authentication accepted"));
840 }
841
842 #[test]
843 fn test_deserialize_newchannel_event() {
844 let raw = "Event: Newchannel\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nChannelState: 4\r\nChannelStateDesc: Ring\r\nCallerIDNum: 100\r\nCallerIDName: Alice\r\n\r\n";
845 let parsed = parse_ami_protocol_message(raw).unwrap();
846 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
847 match event {
848 AmiEvent::Newchannel(data) => {
849 assert_eq!(data.channel, "SIP/100-00000001");
850 assert_eq!(data.uniqueid, "1234");
851 assert_eq!(data.channel_state.as_deref(), Some("4"));
852 assert_eq!(data.channel_state_desc.as_deref(), Some("Ring"));
853 assert_eq!(data.caller_id_num.as_deref(), Some("100"));
854 assert_eq!(data.caller_id_name.as_deref(), Some("Alice"));
855 }
856 _ => panic!("Expected AmiEvent::Newchannel"),
857 }
858 }
859
860 #[test]
861 fn test_deserialize_hangup_event() {
862 let raw = "Event: Hangup\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nCause: 16\r\nCause-txt: Normal Clearing\r\n\r\n";
863 let parsed = parse_ami_protocol_message(raw).unwrap();
864 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
865 match event {
866 AmiEvent::Hangup(data) => {
867 assert_eq!(data.channel, "SIP/100-00000001");
868 assert_eq!(data.uniqueid, "1234");
869 assert_eq!(data.cause.as_deref(), Some("16"));
870 assert_eq!(data.cause_txt.as_deref(), Some("Normal Clearing"));
871 }
872 _ => panic!("Expected AmiEvent::Hangup"),
873 }
874 }
875
876 #[test]
877 fn test_deserialize_peerstatus_event() {
878 let raw = "Event: PeerStatus\r\nPeer: SIP/100\r\nPeerStatus: Registered\r\n\r\n";
879 let parsed = parse_ami_protocol_message(raw).unwrap();
880 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
881 match event {
882 AmiEvent::PeerStatus(data) => {
883 assert_eq!(data.peer, "SIP/100");
884 assert_eq!(data.peer_status, "Registered");
885 }
886 _ => panic!("Expected AmiEvent::PeerStatus"),
887 }
888 }
889
890 #[test]
891 fn test_deserialize_unknown_event() {
892 let raw = "Event: FooBar\r\nSomeField: Value\r\n\r\n";
893 let parsed = parse_ami_protocol_message(raw).unwrap();
894 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
895 match event {
896 AmiEvent::UnknownEvent { event_type, fields } => {
897 assert_eq!(event_type, "FooBar");
898 assert_eq!(fields.get("SomeField").map(|s| s.as_str()), Some("Value"));
899 }
900 _ => panic!("Expected AmiEvent::UnknownEvent"),
901 }
902 }
903
904 #[tokio::test]
905 async fn test_manager_options_clone() {
906 let opts = ManagerOptions {
907 port: 5038,
908 host: "localhost".to_string(),
909 username: "admin".to_string(),
910 password: "pwd".to_string(),
911 events: true,
912 };
913 let opts2 = opts.clone();
914 assert_eq!(opts.port, opts2.port);
915 assert_eq!(opts.host, opts2.host);
916 assert_eq!(opts.username, opts2.username);
917 assert_eq!(opts.password, opts2.password);
918 assert_eq!(opts.events, opts2.events);
919 }
920
921 #[tokio::test]
922 async fn test_manager_new_and_auth_flag() {
923 let manager = Manager::new(); assert!(!manager.is_authenticated().await);
926 }
927
928 #[tokio::test]
929 async fn test_event_internal_connection_lost() {
930 let manager = Manager::new();
932
933 let mut stream = manager.all_events_stream().await;
935
936 {
939 let inner = manager.inner.lock().await;
940 let _ = inner
941 .event_broadcaster
942 .send(AmiEvent::InternalConnectionLost {
943 error: "simulated".to_string(),
944 });
945 }
946
947 let ev = stream.next().await.unwrap().unwrap();
949 match ev {
950 AmiEvent::InternalConnectionLost { error } => {
951 assert_eq!(error, "simulated");
952 }
953 _ => panic!("Expected InternalConnectionLost"),
954 }
955 }
956
957 #[tokio::test]
958 async fn test_manager_options_default() {
959 let opts = ManagerOptions {
960 port: 5038,
961 host: "localhost".to_string(),
962 username: "admin".to_string(),
963 password: "pwd".to_string(),
964 events: true,
965 };
966 assert_eq!(opts.events, true);
967 }
968
969 #[tokio::test]
970 async fn test_manager_new_with_buffer() {
971 let manager = Manager::new_with_buffer(512);
972 assert!(!manager.is_authenticated().await);
973
974 let _stream = manager.all_events_stream().await;
976 }
977
978 #[tokio::test]
979 async fn test_heartbeat_and_watchdog_tokens() {
980 let manager = Manager::new();
981
982 {
984 let inner = manager.inner.lock().await;
985 assert!(inner.heartbeat_token.is_none());
986 assert!(inner.watchdog_token.is_none());
987 }
988
989 let opts = ManagerOptions {
991 port: 5038,
992 host: "127.0.0.1".to_string(),
993 username: "test".to_string(),
994 password: "test".to_string(),
995 events: true,
996 };
997
998 let _ = manager.start_heartbeat().await;
1000 {
1001 let inner = manager.inner.lock().await;
1002 assert!(inner.heartbeat_token.is_some());
1003 }
1004
1005 let _ = manager.start_watchdog(opts).await;
1007 {
1008 let inner = manager.inner.lock().await;
1009 assert!(inner.watchdog_token.is_some());
1010 }
1011
1012 let _ = manager.disconnect().await;
1014 {
1015 let inner = manager.inner.lock().await;
1016 assert!(inner.heartbeat_token.is_none());
1017 assert!(inner.watchdog_token.is_none());
1018 }
1019 }
1020
1021 #[tokio::test]
1022 async fn test_connection_lost_event_emission() {
1023 let manager = Manager::new();
1025 let mut stream = manager.all_events_stream().await;
1026
1027 {
1029 let inner = manager.inner.lock().await;
1030 let _ = inner
1031 .event_broadcaster
1032 .send(AmiEvent::InternalConnectionLost {
1033 error: "test connection lost".to_string(),
1034 });
1035 }
1036
1037 let event = stream.next().await.unwrap().unwrap();
1039 match event {
1040 AmiEvent::InternalConnectionLost { error } => {
1041 assert_eq!(error, "test connection lost");
1042 }
1043 _ => panic!("Expected InternalConnectionLost event"),
1044 }
1045 }
1046
1047 #[tokio::test]
1048 async fn test_heartbeat_interval_respected() {
1049 tokio::time::pause();
1051
1052 let manager = Manager::new();
1053
1054 let _ = manager.start_heartbeat_with_interval(2).await;
1056
1057 tokio::time::advance(Duration::from_secs(1)).await;
1059 {
1060 let inner = manager.inner.lock().await;
1061 assert!(inner.heartbeat_token.is_some());
1063 }
1064
1065 tokio::time::advance(Duration::from_secs(2)).await;
1067
1068 {
1072 let inner = manager.inner.lock().await;
1073 assert!(inner.heartbeat_token.is_some());
1074 }
1075
1076 let _ = manager.disconnect().await;
1078 }
1079
1080 #[tokio::test]
1081 async fn test_watchdog_interval_configuration() {
1082 let manager = Manager::new();
1084
1085 let opts = ManagerOptions {
1086 port: 5038,
1087 host: "127.0.0.1".to_string(),
1088 username: "test".to_string(),
1089 password: "test".to_string(),
1090 events: true,
1091 };
1092
1093 let _ = manager.start_watchdog(opts.clone()).await;
1095 {
1096 let inner = manager.inner.lock().await;
1097 assert!(inner.watchdog_token.is_some());
1098 }
1099
1100 let _ = manager.start_watchdog_with_interval(opts.clone(), 5).await;
1102 {
1103 let inner = manager.inner.lock().await;
1104 assert!(inner.watchdog_token.is_some());
1105 }
1106
1107 let _ = manager.disconnect().await;
1109 }
1110}