Skip to main content

asterisk_rs_ami/
response.rs

1//! AMI response types and ActionID correlation
2
3use crate::codec::RawAmiMessage;
4use std::collections::HashMap;
5
6/// parsed AMI response
7#[derive(Debug, Clone, PartialEq)]
8pub struct AmiResponse {
9    /// the ActionID this response corresponds to
10    pub action_id: String,
11    /// whether the action succeeded
12    pub success: bool,
13    /// the Response header value ("Success", "Error", "Follows")
14    pub response_type: String,
15    /// the Message header, if present
16    pub message: Option<String>,
17    /// all headers as a map
18    pub headers: HashMap<String, String>,
19    /// command output lines (populated for Response: Follows)
20    pub output: Vec<String>,
21    /// channel variables extracted from ChanVariable(name) headers
22    pub channel_variables: HashMap<String, String>,
23}
24
25impl AmiResponse {
26    /// parse a response from a raw AMI message
27    ///
28    /// returns `None` for non-response messages (e.g., events)
29    pub fn from_raw(raw: &RawAmiMessage) -> Option<Self> {
30        // messages with both Event: and Response: headers are events
31        // (e.g. OriginateResponse carries Response: Success/Failure
32        // but is an event, not an action response)
33        if raw.get("Event").is_some() {
34            return None;
35        }
36        let response_type = raw.get("Response")?.to_string();
37        // action ID may be absent for unsolicited responses
38        let action_id = raw.get("ActionID").unwrap_or("").to_string();
39        let success = response_type.eq_ignore_ascii_case("success")
40            || response_type.eq_ignore_ascii_case("follows");
41        let message = raw.get("Message").map(String::from);
42        let headers = raw.to_map();
43
44        Some(Self {
45            action_id,
46            success,
47            response_type,
48            message,
49            headers,
50            output: raw.output.clone(),
51            channel_variables: raw.channel_variables.clone(),
52        })
53    }
54
55    /// get a header value from the response
56    pub fn get(&self, key: &str) -> Option<&str> {
57        self.headers
58            .iter()
59            .find(|(k, _)| k.eq_ignore_ascii_case(key))
60            .map(|(_, v)| v.as_str())
61    }
62
63    /// get a channel variable by name
64    pub fn get_variable(&self, name: &str) -> Option<&str> {
65        self.channel_variables.get(name).map(|s| s.as_str())
66    }
67}
68
69/// response from an event-generating action (e.g., Status, QueueStatus)
70///
71/// contains the initial response plus all events received until the
72/// completion marker event
73#[derive(Debug, Clone)]
74pub struct EventListResponse {
75    /// the initial response to the action
76    pub response: AmiResponse,
77    /// events received as part of this action's result
78    pub events: Vec<crate::event::AmiEvent>,
79}
80
81/// tracks a pending event-generating action
82struct PendingEventList {
83    response: Option<AmiResponse>,
84    events: Vec<crate::event::AmiEvent>,
85    tx: tokio::sync::oneshot::Sender<EventListResponse>,
86}
87
88/// pending action tracker — correlates ActionIDs with response channels
89pub struct PendingActions {
90    pending: HashMap<String, tokio::sync::oneshot::Sender<AmiResponse>>,
91    pending_event_lists: HashMap<String, PendingEventList>,
92}
93
94impl PendingActions {
95    pub fn new() -> Self {
96        Self {
97            pending: HashMap::new(),
98            pending_event_lists: HashMap::new(),
99        }
100    }
101
102    /// register a pending action, returns a receiver for the response
103    pub fn register(&mut self, action_id: String) -> tokio::sync::oneshot::Receiver<AmiResponse> {
104        let (tx, rx) = tokio::sync::oneshot::channel();
105        self.pending.insert(action_id, tx);
106        rx
107    }
108
109    /// deliver a response to the waiting caller
110    ///
111    /// returns true if the response was delivered, false if no one was waiting
112    pub fn deliver(&mut self, response: AmiResponse) -> bool {
113        if let Some(tx) = self.pending.remove(&response.action_id) {
114            // send can fail if the receiver was dropped, which is fine
115            tx.send(response).is_ok()
116        } else {
117            false
118        }
119    }
120
121    /// number of actions waiting for responses
122    pub fn pending_count(&self) -> usize {
123        self.pending.len() + self.pending_event_lists.len()
124    }
125
126    /// cancel all pending actions (e.g., on disconnect)
127    ///
128    /// drops all senders, causing receivers to get `RecvError::Closed`
129    pub fn cancel_all(&mut self) {
130        self.pending.clear();
131        self.pending_event_lists.clear();
132    }
133
134    /// register with a pre-existing sender (used by connection manager)
135    pub fn register_with_sender(
136        &mut self,
137        action_id: String,
138        tx: tokio::sync::oneshot::Sender<AmiResponse>,
139    ) {
140        self.pending.insert(action_id, tx);
141    }
142
143    /// register a pending event-generating action
144    pub fn register_event_list(
145        &mut self,
146        action_id: String,
147        tx: tokio::sync::oneshot::Sender<EventListResponse>,
148    ) {
149        self.pending_event_lists.insert(
150            action_id,
151            PendingEventList {
152                response: None,
153                events: Vec::new(),
154                tx,
155            },
156        );
157    }
158
159    /// deliver the initial response for an event-generating action
160    ///
161    /// returns true if this action_id has a pending event list
162    pub fn deliver_event_list_response(&mut self, response: AmiResponse) -> bool {
163        if let Some(pending) = self.pending_event_lists.get_mut(&response.action_id) {
164            pending.response = Some(response);
165            true
166        } else {
167            false
168        }
169    }
170
171    /// deliver an event for an event-generating action
172    ///
173    /// returns true if this event was consumed by a pending event list
174    pub fn deliver_event_list_event(
175        &mut self,
176        action_id: &str,
177        event: crate::event::AmiEvent,
178    ) -> bool {
179        let is_complete = event.event_name().ends_with("Complete");
180
181        if let Some(mut pending) = if is_complete {
182            self.pending_event_lists.remove(action_id)
183        } else {
184            None
185        } {
186            pending.events.push(event);
187            let response = pending.response.unwrap_or_else(|| AmiResponse {
188                action_id: action_id.to_string(),
189                success: true,
190                response_type: String::new(),
191                message: None,
192                headers: HashMap::new(),
193                output: vec![],
194                channel_variables: HashMap::new(),
195            });
196            let _ = pending.tx.send(EventListResponse {
197                response,
198                events: pending.events,
199            });
200            return true;
201        }
202
203        if let Some(pending) = self.pending_event_lists.get_mut(action_id) {
204            pending.events.push(event);
205            return true;
206        }
207
208        false
209    }
210}
211
212impl Default for PendingActions {
213    fn default() -> Self {
214        Self::new()
215    }
216}