1use serde::de::Deserializer;
56use serde::{Deserialize, Serialize};
57use std::collections::HashMap;
58use std::sync::Arc;
59use thiserror::Error;
60use tokio::io::{AsyncReadExt, AsyncWriteExt};
61use tokio::net::TcpStream;
62use tokio::sync::{broadcast, oneshot, Mutex};
63use tokio::time::{timeout, Duration};
64use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
65use tokio_stream::wrappers::BroadcastStream;
66use tokio_stream::Stream;
67use uuid::Uuid;
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct AmiResponse {
71 #[serde(rename = "Response")]
72 pub response: String,
73 #[serde(rename = "ActionID")]
74 pub action_id: Option<String>,
75 #[serde(rename = "Message")]
76 pub message: Option<String>,
77 #[serde(flatten)]
78 pub fields: HashMap<String, String>,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(tag = "Action", rename_all = "PascalCase")]
83pub enum AmiAction {
84 Login {
85 username: String,
86 secret: String,
87 #[serde(rename = "Events")]
88 events: Option<String>,
89 #[serde(rename = "ActionID")]
90 action_id: Option<String>,
91 },
92 Logoff {
93 #[serde(rename = "ActionID")]
94 action_id: Option<String>,
95 },
96 Ping {
97 #[serde(rename = "ActionID")]
98 action_id: Option<String>,
99 },
100 Command {
101 command: String,
102 #[serde(rename = "ActionID")]
103 action_id: Option<String>,
104 },
105 Custom {
106 action: String,
107 #[serde(flatten)]
108 params: HashMap<String, String>,
109 #[serde(rename = "ActionID")]
110 action_id: Option<String>,
111 },
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct NewchannelEventData {
116 #[serde(rename = "Channel")]
117 pub channel: String,
118 #[serde(rename = "Uniqueid")]
119 pub uniqueid: String,
120 #[serde(rename = "ChannelState")]
121 pub channel_state: Option<String>,
122 #[serde(rename = "ChannelStateDesc")]
123 pub channel_state_desc: Option<String>,
124 #[serde(rename = "CallerIDNum")]
125 pub caller_id_num: Option<String>,
126 #[serde(rename = "CallerIDName")]
127 pub caller_id_name: Option<String>,
128 #[serde(flatten)]
129 pub other: HashMap<String, String>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct HangupEventData {
134 #[serde(rename = "Channel")]
135 pub channel: String,
136 #[serde(rename = "Uniqueid")]
137 pub uniqueid: String,
138 #[serde(rename = "Cause")]
139 pub cause: Option<String>,
140 #[serde(rename = "Cause-txt")]
141 pub cause_txt: Option<String>,
142 #[serde(flatten)]
143 pub other: HashMap<String, String>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct PeerStatusEventData {
148 #[serde(rename = "Peer")]
149 pub peer: String,
150 #[serde(rename = "PeerStatus")]
151 pub peer_status: String,
152 #[serde(flatten)]
153 pub other: HashMap<String, String>,
154}
155
156#[derive(Debug, Clone, Serialize)]
157pub enum AmiEvent {
158 Newchannel(NewchannelEventData),
159 Hangup(HangupEventData),
160 PeerStatus(PeerStatusEventData),
161 UnknownEvent {
162 event_type: String,
163 fields: HashMap<String, String>,
164 },
165 InternalConnectionLost {
166 error: String,
167 },
168}
169
170impl<'de> Deserialize<'de> for AmiEvent {
171 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
172 where
173 D: Deserializer<'de>,
174 {
175 let value = serde_json::Value::deserialize(deserializer)?;
176 let map_obj = value
177 .as_object()
178 .ok_or_else(|| serde::de::Error::custom("AmiEvent: Expected a JSON object/map"))?;
179
180 if let Some(event_type_val) = map_obj.get("Event") {
181 let event_type_str = event_type_val.as_str().ok_or_else(|| {
182 serde::de::Error::custom("AmiEvent: 'Event' field is not a string")
183 })?;
184
185 match event_type_str {
186 "Newchannel" => Ok(AmiEvent::Newchannel(
187 NewchannelEventData::deserialize(value.clone())
188 .map_err(serde::de::Error::custom)?,
189 )),
190 "Hangup" => Ok(AmiEvent::Hangup(
191 HangupEventData::deserialize(value.clone())
192 .map_err(serde::de::Error::custom)?,
193 )),
194 "PeerStatus" => Ok(AmiEvent::PeerStatus(
195 PeerStatusEventData::deserialize(value.clone())
196 .map_err(serde::de::Error::custom)?,
197 )),
198 _ => {
199 let fields: HashMap<String, String> = map_obj
200 .iter()
201 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
202 .collect();
203 Ok(AmiEvent::UnknownEvent {
204 event_type: event_type_str.to_string(),
205 fields,
206 })
207 }
208 }
209 } else {
210 let fields: HashMap<String, String> = map_obj
211 .iter()
212 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
213 .collect();
214 Ok(AmiEvent::UnknownEvent {
215 event_type: "UnknownOrMalformed".to_string(),
216 fields,
217 })
218 }
219 }
220}
221
222#[derive(Debug, Error)]
223pub enum AmiError {
224 #[error("IO error: {0}")]
225 Io(#[from] std::io::Error),
226 #[error("Parse error: {0}")]
227 ParseError(String),
228 #[error("Serialize error: {0}")]
229 SerializeError(String),
230 #[error("Authentication failed: {0}")]
231 AuthenticationFailed(String),
232 #[error("Action failed: {response:?}")]
233 ActionFailed { response: AmiResponse },
234 #[error("Connection closed")]
235 ConnectionClosed,
236 #[error("Operation timed out")]
237 Timeout,
238 #[error("Login required")]
239 LoginRequired,
240 #[error("Internal channel error: {0}")]
241 ChannelError(String),
242 #[error("Event stream lagged: {0}")]
243 EventStreamLagged(#[from] tokio::sync::broadcast::error::RecvError),
244 #[error("Not connected to AMI server")]
245 NotConnected,
246 #[error("Other error: {0}")]
247 Other(String),
248}
249
250#[derive(Serialize, Deserialize, Debug, Clone)]
251pub struct ManagerOptions {
252 pub port: u16,
253 pub host: String,
254 pub username: String,
255 pub password: String,
256 pub events: bool,
257}
258
259struct InnerManager {
260 options: ManagerOptions,
261 connection: Option<TcpStream>,
262 authenticated: bool,
263 event_broadcaster: broadcast::Sender<AmiEvent>,
264 pending_responses: HashMap<String, oneshot::Sender<Result<AmiResponse, AmiError>>>,
265}
266
267#[derive(Clone)]
268pub struct Manager {
269 inner: Arc<Mutex<InnerManager>>,
270}
271
272impl Manager {
273 pub fn new(options: ManagerOptions) -> Self {
274 let (event_tx, _) = broadcast::channel(1024);
275 Self {
276 inner: Arc::new(Mutex::new(InnerManager {
277 options,
278 connection: None,
279 authenticated: false,
280 event_broadcaster: event_tx,
281 pending_responses: HashMap::new(),
282 })),
283 }
284 }
285
286 pub async fn connect_and_login(&self) -> Result<(), AmiError> {
287 {
288 let mut inner = self.inner.lock().await;
289 inner.connect().await?;
290 inner.authenticate().await?;
291 }
292 let this = self.clone();
293 tokio::spawn(async move {
294 let _ = this.read_loop().await;
295 });
296 Ok(())
297 }
298
299 pub async fn send_action(&self, mut action: AmiAction) -> Result<AmiResponse, AmiError> {
300 let action_id = get_or_set_action_id(&mut action);
301
302 let (tx, rx) = oneshot::channel();
303 {
304 let mut inner = self.inner.lock().await;
305 if !inner.authenticated && !matches!(action, AmiAction::Login { .. }) {
306 return Err(AmiError::LoginRequired);
307 }
308 if inner.connection.is_none() {
309 return Err(AmiError::NotConnected);
310 }
311
312 inner.pending_responses.insert(action_id.clone(), tx);
313 let action_str = serialize_ami_action(&action)?;
314 let conn = inner.connection.as_mut().ok_or(AmiError::NotConnected)?;
315
316 conn.write_all(action_str.as_bytes())
317 .await
318 .map_err(AmiError::Io)?;
319 conn.flush().await.map_err(AmiError::Io)?;
320 }
321 match timeout(Duration::from_secs(10), rx).await {
322 Ok(Ok(Ok(resp))) => {
323 if resp.response.eq_ignore_ascii_case("Error") {
324 Err(AmiError::ActionFailed { response: resp })
325 } else {
326 Ok(resp)
327 }
328 }
329 Ok(Ok(Err(e))) => Err(e),
330 Ok(Err(_)) => Err(AmiError::ChannelError("Responder dropped".to_string())),
331 Err(_) => Err(AmiError::Timeout),
332 }
333 }
334
335 async fn read_loop(&self) -> Result<(), AmiError> {
336 loop {
337 let processing_result: Result<(), AmiError> = async {
338 loop {
339 let raw_data: String;
340 {
341 let mut inner = self.inner.lock().await;
342 raw_data = inner.read_ami_message_raw().await?;
343 }
344 let parsed_messages = parse_ami_protocol_message(&raw_data)?;
345 {
346 let mut inner = self.inner.lock().await;
347 for value_msg in parsed_messages {
348 if value_msg.get("Event").is_some() {
349 match serde_json::from_value::<AmiEvent>(value_msg.clone())
350 .map_err(|e| AmiError::ParseError(format!("AmiEvent: {}", e)))
351 {
352 Ok(event) => {
353 let _ = inner.event_broadcaster.send(event);
354 }
355 Err(_) => {
356 let mut fallback = HashMap::new();
357 if let Some(obj) = value_msg.as_object() {
358 for (k, v) in obj {
359 if let Some(s) = v.as_str() {
360 fallback.insert(k.clone(), s.to_string());
361 }
362 }
363 }
364 let _ =
365 inner.event_broadcaster.send(AmiEvent::UnknownEvent {
366 event_type: "ParseError".to_string(),
367 fields: fallback,
368 });
369 }
370 }
371 } else if value_msg.get("Response").is_some() {
372 match serde_json::from_value::<AmiResponse>(value_msg.clone())
373 .map_err(|e| {
374 AmiError::ParseError(format!("AmiResponse: {}", e))
375 }) {
376 Ok(resp) => {
377 if let Some(action_id) = &resp.action_id {
378 if let Some(responder) =
379 inner.pending_responses.remove(action_id)
380 {
381 let _ = responder.send(Ok(resp));
382 }
383 }
384 }
385 Err(parse_err) => {
386 if let Some(action_id) =
387 value_msg.get("ActionID").and_then(|v| v.as_str())
388 {
389 if let Some(responder) =
390 inner.pending_responses.remove(action_id)
391 {
392 let _ = responder.send(Err(parse_err));
393 }
394 }
395 }
396 }
397 }
398 }
399 }
400 }
401 }
402 .await;
403
404 match processing_result {
405 Ok(()) => return Ok(()),
406 Err(err) => {
407 {
408 let mut inner = self.inner.lock().await;
409 inner.authenticated = false;
410 inner.connection = None;
411 for (_, responder) in inner.pending_responses.drain() {
412 let _ = responder.send(Err(AmiError::ConnectionClosed));
413 }
414 let _ = inner
415 .event_broadcaster
416 .send(AmiEvent::InternalConnectionLost {
417 error: format!("{}", err),
418 });
419 }
420 return Err(err);
421 }
422 }
423 }
424 }
425
426 pub async fn disconnect(&self) -> Result<(), AmiError> {
427 let mut inner = self.inner.lock().await;
428 if let Some(mut connection) = inner.connection.take() {
429 let logoff_action = AmiAction::Logoff {
430 action_id: Some("rust-ami-logoff".to_string()),
431 };
432 let action_str = serialize_ami_action(&logoff_action)?;
433 let _ = connection.write_all(action_str.as_bytes()).await;
434 let _ = connection.shutdown().await;
435 }
436 inner.authenticated = false;
437 Ok(())
438 }
439
440 pub async fn is_authenticated(&self) -> bool {
441 let inner = self.inner.lock().await;
442 inner.authenticated
443 }
444
445 pub async fn all_events_stream(
446 &self,
447 ) -> impl Stream<Item = Result<AmiEvent, BroadcastStreamRecvError>> + Send + Unpin {
448 let inner = self.inner.lock().await;
449 BroadcastStream::new(inner.event_broadcaster.subscribe())
450 }
451}
452
453impl InnerManager {
454 async fn connect(&mut self) -> Result<(), AmiError> {
455 let stream = timeout(
456 Duration::from_secs(10),
457 TcpStream::connect((self.options.host.as_str(), self.options.port)),
458 )
459 .await
460 .map_err(|_| AmiError::Timeout)?
461 .map_err(AmiError::Io)?;
462 self.connection = Some(stream);
463 let mut temp_buf = [0; 1024];
464 if let Some(conn) = self.connection.as_mut() {
465 let _ = conn.read(&mut temp_buf).await;
466 }
467 Ok(())
468 }
469
470 async fn authenticate(&mut self) -> Result<(), AmiError> {
471 let login_action = AmiAction::Login {
472 username: self.options.username.clone(),
473 secret: self.options.password.clone(),
474 events: Some(if self.options.events { "on" } else { "off" }.to_string()),
475 action_id: Some("rust-ami-login".to_string()),
476 };
477 let action_str = serialize_ami_action(&login_action)?;
478 let conn = self.connection.as_mut().ok_or(AmiError::NotConnected)?;
479 conn.write_all(action_str.as_bytes())
480 .await
481 .map_err(AmiError::Io)?;
482 let response_data = self.read_ami_message_raw().await?;
483 let parsed = parse_ami_protocol_message(&response_data)?;
484 for value_msg in parsed {
485 if let Ok(resp) = serde_json::from_value::<AmiResponse>(value_msg) {
486 if resp.response.eq_ignore_ascii_case("Success") {
487 self.authenticated = true;
488 return Ok(());
489 } else if resp.response.eq_ignore_ascii_case("Error") {
490 return Err(AmiError::AuthenticationFailed(
491 resp.message.unwrap_or_default(),
492 ));
493 }
494 }
495 }
496 Err(AmiError::AuthenticationFailed(
497 "No valid success response received for login".to_string(),
498 ))
499 }
500
501 async fn read_ami_message_raw(&mut self) -> Result<String, AmiError> {
502 let mut buffer = vec![0; 8192];
503 let mut complete_data = String::new();
504
505 let (_local_addr_str, _peer_addr_str) = {
506 let conn_ref = self.connection.as_ref().ok_or(AmiError::NotConnected)?;
507 let local_addr = conn_ref.local_addr().map_err(AmiError::Io)?;
508 let peer_addr = conn_ref.peer_addr().map_err(AmiError::Io)?;
509 (local_addr.to_string(), peer_addr.to_string())
510 };
511
512 let connection = self.connection.as_mut().ok_or(AmiError::NotConnected)?;
513 loop {
514 let n = connection
515 .read(&mut buffer)
516 .await
517 .map_err(|e| AmiError::Io(e))?;
518 if n == 0 {
519 return Err(AmiError::ConnectionClosed);
520 }
521 let data_chunk_str = String::from_utf8_lossy(&buffer[..n]);
522 complete_data.push_str(&data_chunk_str);
523 if complete_data.ends_with("\r\n\r\n") {
524 break;
525 }
526 }
527 Ok(complete_data)
528 }
529}
530
531fn parse_ami_protocol_message(raw_data: &str) -> Result<Vec<serde_json::Value>, AmiError> {
532 let mut messages = Vec::new();
533 for block in raw_data.trim().split("\r\n\r\n") {
534 if block.is_empty() {
535 continue;
536 }
537 let mut map = serde_json::Map::new();
538 for line in block.lines() {
539 if let Some((key, value)) = line.split_once(": ") {
540 map.insert(
541 key.trim().to_string(),
542 serde_json::Value::String(value.trim().to_string()),
543 );
544 }
545 }
546 if !map.is_empty() {
547 messages.push(serde_json::Value::Object(map));
548 }
549 }
550 Ok(messages)
551}
552
553fn serialize_ami_action(action: &AmiAction) -> Result<String, AmiError> {
554 let mut s = String::new();
555 match action {
556 AmiAction::Login {
557 username,
558 secret,
559 events,
560 action_id,
561 } => {
562 s.push_str("Action: Login\r\n");
563 s.push_str(&format!("Username: {}\r\n", username));
564 s.push_str(&format!("Secret: {}\r\n", secret));
565 if let Some(ev) = events {
566 s.push_str(&format!("Events: {}\r\n", ev));
567 }
568 if let Some(id) = action_id {
569 s.push_str(&format!("ActionID: {}\r\n", id));
570 }
571 }
572 AmiAction::Logoff { action_id } => {
573 s.push_str("Action: Logoff\r\n");
574 if let Some(id) = action_id {
575 s.push_str(&format!("ActionID: {}\r\n", id));
576 }
577 }
578 AmiAction::Ping { action_id } => {
579 s.push_str("Action: Ping\r\n");
580 if let Some(id) = action_id {
581 s.push_str(&format!("ActionID: {}\r\n", id));
582 }
583 }
584 AmiAction::Command { command, action_id } => {
585 s.push_str("Action: Command\r\n");
586 s.push_str(&format!("Command: {}\r\n", command));
587 if let Some(id) = action_id {
588 s.push_str(&format!("ActionID: {}\r\n", id));
589 }
590 }
591 AmiAction::Custom {
592 action: action_name,
593 params,
594 action_id,
595 } => {
596 s.push_str(&format!("Action: {}\r\n", action_name));
597 for (k, v) in params {
598 s.push_str(&format!("{}: {}\r\n", k, v));
599 }
600 if let Some(id) = action_id {
601 s.push_str(&format!("ActionID: {}\r\n", id));
602 }
603 }
604 }
605 s.push_str("\r\n");
606 Ok(s)
607}
608
609fn get_or_set_action_id(action: &mut AmiAction) -> String {
610 match action {
611 AmiAction::Login { action_id, .. }
612 | AmiAction::Logoff { action_id }
613 | AmiAction::Ping { action_id }
614 | AmiAction::Command { action_id, .. }
615 | AmiAction::Custom { action_id, .. } => {
616 if let Some(id) = action_id {
617 id.clone()
618 } else {
619 let new_id = Uuid::new_v4().to_string();
620 *action_id = Some(new_id.clone());
621 new_id
622 }
623 }
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630 use tokio_stream::StreamExt;
631
632 #[test]
633 fn test_serialize_login_action() {
634 let action = AmiAction::Login {
635 username: "user".to_string(),
636 secret: "pass".to_string(),
637 events: Some("on".to_string()),
638 action_id: Some("abc123".to_string()),
639 };
640 let s = serialize_ami_action(&action).unwrap();
641 assert!(s.contains("Action: Login"));
642 assert!(s.contains("Username: user"));
643 assert!(s.contains("Secret: pass"));
644 assert!(s.contains("Events: on"));
645 assert!(s.contains("ActionID: abc123"));
646 assert!(s.ends_with("\r\n\r\n"));
647 }
648
649 #[test]
650 fn test_serialize_command_action() {
651 let action = AmiAction::Command {
652 command: "sip show peers".to_string(),
653 action_id: None,
654 };
655 let s = serialize_ami_action(&action).unwrap();
656 assert!(s.contains("Action: Command"));
657 assert!(s.contains("Command: sip show peers"));
658 }
659
660 #[test]
661 fn test_parse_ami_protocol_message() {
662 let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
663 let parsed = parse_ami_protocol_message(raw).unwrap();
664 assert_eq!(parsed.len(), 1);
665 let obj = &parsed[0];
666 assert_eq!(obj["Response"], "Success");
667 assert_eq!(obj["ActionID"], "123");
668 assert_eq!(obj["Message"], "Authentication accepted");
669 }
670
671 #[test]
672 fn test_deserialize_ami_response() {
673 let raw = "Response: Success\r\nActionID: 123\r\nMessage: Authentication accepted\r\n\r\n";
674 let parsed = parse_ami_protocol_message(raw).unwrap();
675 let resp: AmiResponse = serde_json::from_value(parsed[0].clone()).unwrap();
676 assert_eq!(resp.response, "Success");
677 assert_eq!(resp.action_id.as_deref(), Some("123"));
678 assert_eq!(resp.message.as_deref(), Some("Authentication accepted"));
679 }
680
681 #[test]
682 fn test_deserialize_newchannel_event() {
683 let raw = "Event: Newchannel\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nChannelState: 4\r\nChannelStateDesc: Ring\r\nCallerIDNum: 100\r\nCallerIDName: Alice\r\n\r\n";
684 let parsed = parse_ami_protocol_message(raw).unwrap();
685 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
686 match event {
687 AmiEvent::Newchannel(data) => {
688 assert_eq!(data.channel, "SIP/100-00000001");
689 assert_eq!(data.uniqueid, "1234");
690 assert_eq!(data.channel_state.as_deref(), Some("4"));
691 assert_eq!(data.channel_state_desc.as_deref(), Some("Ring"));
692 assert_eq!(data.caller_id_num.as_deref(), Some("100"));
693 assert_eq!(data.caller_id_name.as_deref(), Some("Alice"));
694 }
695 _ => panic!("Expected AmiEvent::Newchannel"),
696 }
697 }
698
699 #[test]
700 fn test_deserialize_hangup_event() {
701 let raw = "Event: Hangup\r\nChannel: SIP/100-00000001\r\nUniqueid: 1234\r\nCause: 16\r\nCause-txt: Normal Clearing\r\n\r\n";
702 let parsed = parse_ami_protocol_message(raw).unwrap();
703 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
704 match event {
705 AmiEvent::Hangup(data) => {
706 assert_eq!(data.channel, "SIP/100-00000001");
707 assert_eq!(data.uniqueid, "1234");
708 assert_eq!(data.cause.as_deref(), Some("16"));
709 assert_eq!(data.cause_txt.as_deref(), Some("Normal Clearing"));
710 }
711 _ => panic!("Expected AmiEvent::Hangup"),
712 }
713 }
714
715 #[test]
716 fn test_deserialize_peerstatus_event() {
717 let raw = "Event: PeerStatus\r\nPeer: SIP/100\r\nPeerStatus: Registered\r\n\r\n";
718 let parsed = parse_ami_protocol_message(raw).unwrap();
719 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
720 match event {
721 AmiEvent::PeerStatus(data) => {
722 assert_eq!(data.peer, "SIP/100");
723 assert_eq!(data.peer_status, "Registered");
724 }
725 _ => panic!("Expected AmiEvent::PeerStatus"),
726 }
727 }
728
729 #[test]
730 fn test_deserialize_unknown_event() {
731 let raw = "Event: FooBar\r\nSomeField: Value\r\n\r\n";
732 let parsed = parse_ami_protocol_message(raw).unwrap();
733 let event: AmiEvent = serde_json::from_value(parsed[0].clone()).unwrap();
734 match event {
735 AmiEvent::UnknownEvent { event_type, fields } => {
736 assert_eq!(event_type, "FooBar");
737 assert_eq!(fields.get("SomeField").map(|s| s.as_str()), Some("Value"));
738 }
739 _ => panic!("Expected AmiEvent::UnknownEvent"),
740 }
741 }
742
743 #[tokio::test]
744 async fn test_manager_options_clone() {
745 let opts = ManagerOptions {
746 port: 5038,
747 host: "localhost".to_string(),
748 username: "admin".to_string(),
749 password: "pwd".to_string(),
750 events: true,
751 };
752 let opts2 = opts.clone();
753 assert_eq!(opts.port, opts2.port);
754 assert_eq!(opts.host, opts2.host);
755 assert_eq!(opts.username, opts2.username);
756 assert_eq!(opts.password, opts2.password);
757 assert_eq!(opts.events, opts2.events);
758 }
759
760 #[tokio::test]
761 async fn test_manager_new_and_auth_flag() {
762 let opts = ManagerOptions {
763 port: 5038,
764 host: "localhost".to_string(),
765 username: "admin".to_string(),
766 password: "pwd".to_string(),
767 events: false,
768 };
769 let manager = Manager::new(opts);
770 assert!(!manager.is_authenticated().await);
771 }
772
773 #[tokio::test]
774 async fn test_event_internal_connection_lost() {
775 let opts = ManagerOptions {
776 port: 5038,
777 host: "localhost".to_string(),
778 username: "admin".to_string(),
779 password: "pwd".to_string(),
780 events: true,
781 };
782 let manager = Manager::new(opts);
783 let mut stream = manager.all_events_stream().await;
784 {
785 let inner = manager.inner.lock().await;
786 let _ = inner
787 .event_broadcaster
788 .send(AmiEvent::InternalConnectionLost {
789 error: "simulated".to_string(),
790 });
791 }
792 let ev = stream.next().await.unwrap().unwrap();
793 match ev {
794 AmiEvent::InternalConnectionLost { error } => {
795 assert_eq!(error, "simulated");
796 }
797 _ => panic!("Expected InternalConnectionLost"),
798 }
799 }
800
801 #[tokio::test]
802 async fn test_manager_options_default() {
803 let opts = ManagerOptions {
804 port: 5038,
805 host: "localhost".to_string(),
806 username: "admin".to_string(),
807 password: "pwd".to_string(),
808 events: true,
809 };
810 assert_eq!(opts.events, true);
811 }
812}