1use crate::{apis, ws};
2use futures_util::StreamExt;
3use std::collections::HashMap;
4use std::future::Future;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::sync::{Arc, RwLock};
8use tracing::{debug, error};
9
10type Handler = Arc<
11 dyn Fn(
12 Arc<apis::client::Client>,
13 ws::models::Event,
14 ) -> Pin<Box<dyn Future<Output = crate::errors::Result<()>> + Send>>
15 + Send
16 + Sync,
17>;
18
19#[derive(Clone)]
22pub struct AriClient {
23 client: Arc<apis::client::Client>,
24 ws: Arc<tokio::sync::Mutex<ws::client::Client>>,
25 event_handlers: Arc<RwLock<HashMap<String, Handler>>>,
26}
27
28impl AriClient {
29 pub fn with_config(config: crate::config::Config) -> Self {
31 AriClient {
32 client: Arc::new(apis::client::Client::with_config(config.clone())),
33 ws: Arc::new(tokio::sync::Mutex::new(ws::client::Client::with_config(
34 config,
35 ))),
36 event_handlers: Arc::new(RwLock::new(HashMap::new())),
37 }
38 }
39
40 pub fn on_unknown_event<F, Fut>(&mut self, handler: F) -> &mut Self
42 where
43 F: Fn(Arc<apis::client::Client>, ws::models::Event) -> Fut + Send + Sync + 'static,
44 Fut: Future<Output = crate::errors::Result<()>> + Send + 'static,
45 {
46 self.event_handlers.write().unwrap().insert(
47 "Unknown".to_string(),
48 Arc::new(move |client, event| Box::pin(handler(client, event))),
49 );
50 self
51 }
52
53 pub fn on_event<F, Fut>(&mut self, key: impl Into<String>, handler: F) -> &mut Self
55 where
56 F: Fn(Arc<apis::client::Client>, ws::models::Event) -> Fut + Send + Sync + 'static,
57 Fut: Future<Output = crate::errors::Result<()>> + Send + 'static,
58 {
59 self.event_handlers.write().unwrap().insert(
60 key.into(),
61 Arc::new(move |client, event| Box::pin(handler(client, event))),
62 );
63 self
64 }
65
66 pub async fn start(
68 &mut self,
69 application_name: impl Into<String>,
70 ) -> crate::errors::Result<()> {
71 let mut stream = self
72 .ws
73 .lock()
74 .await
75 .connect(ws::params::ListenRequest::new(application_name))
76 .await?;
77
78 let event_handlers = self.event_handlers.clone();
79 let client = self.client.clone();
80 tokio::task::spawn(async move {
81 while let Some(event) = stream.next().await {
82 let maybe_handler = {
84 let handlers = event_handlers.read().unwrap();
85 handlers.get(&event.to_string()).cloned()
86 };
87
88 if let Some(handler) = maybe_handler {
89 match handler(client.clone(), event).await {
90 Ok(_) => {}
91 Err(e) => {
92 error!("Error handling event: {:?}", e);
93 }
94 }
95 } else {
96 debug!(
97 "No handler registered for event type: {}",
98 event.to_string()
99 );
100 }
101 }
102 });
103
104 Ok(())
105 }
106
107 pub async fn stop(&mut self) -> Result<(), crate::errors::AriError> {
109 self.ws.lock().await.disconnect().await
110 }
111
112 pub fn client(&self) -> &apis::client::Client {
114 &self.client
115 }
116}
117
118impl Deref for AriClient {
119 type Target = apis::client::Client;
120 fn deref(&self) -> &Self::Target {
121 &self.client
122 }
123}
124
125macro_rules! create_event_handler {
127 ($($event_name:ident => $event_variant:ident),*) => {
128 impl AriClient {
129 $(
130 pub fn $event_name<F, Fut>(&mut self, handler: F) -> &mut Self
132 where
133 F: Fn(Arc<apis::client::Client>, ws::models::BaseEvent<ws::models::$event_variant>) -> Fut
134 + Send
135 + Sync
136 + 'static,
137 Fut: Future<Output = crate::errors::Result<()>> + Send + 'static,
138 {
139 let handler = Arc::new(handler);
140 self.on_event(stringify!($event_variant).to_string(), move |client, event| {
141 let handler = handler.clone();
142 async move {
143 if let ws::models::Event::$event_variant(e) = event {
144 handler(client, e).await
145 } else {
146 unreachable!();
147 }
148 }
149 });
150 self
151 }
152 )*
153 }
154 };
155}
156
157create_event_handler!(
158 on_application_move_failed => ApplicationMoveFailed,
159 on_application_replaced => ApplicationReplaced,
160 on_bridge_attended_transfer => BridgeAttendedTransfer,
161 on_bridge_blind_transfer => BridgeBlindTransfer,
162 on_bridge_created => BridgeCreated,
163 on_bridge_destroyed => BridgeDestroyed,
164 on_bridge_merged => BridgeMerged,
165 on_bridge_video_source_changed => BridgeVideoSourceChanged,
166 on_channel_caller_id => ChannelCallerId,
167 on_channel_connected_line => ChannelConnectedLine,
168 on_channel_created => ChannelCreated,
169 on_channel_destroyed => ChannelDestroyed,
170 on_channel_dialplan => ChannelDialplan,
171 on_channel_dtmf_received => ChannelDtmfReceived,
172 on_channel_entered_bridge => ChannelEnteredBridge,
173 on_channel_hangup_request => ChannelHangupRequest,
174 on_channel_hold => ChannelHold,
175 on_channel_left_bridge => ChannelLeftBridge,
176 on_channel_state_change => ChannelStateChange,
177 on_channel_talking_finished => ChannelTalkingFinished,
178 on_channel_talking_started => ChannelTalkingStarted,
179 on_channel_tone_detected => ChannelToneDetected,
180 on_channel_unhold => ChannelUnhold,
181 on_channel_user_event => ChannelUserEvent,
182 on_channel_var_set => ChannelVarSet,
183 on_contact_info => ContactInfo,
184 on_contact_status_change => ContactStatusChange,
185 on_device_state_changed => DeviceStateChanged,
186 on_dial => Dial,
187 on_endpoint_state_change => EndpointStateChange,
188 on_missing_params => MissingParams,
189 on_peer => Peer,
190 on_peer_status_change => PeerStatusChange,
191 on_playback_continuing => PlaybackContinuing,
192 on_playback_finished => PlaybackFinished,
193 on_playback_started => PlaybackStarted,
194 on_recording_failed => RecordingFailed,
195 on_recording_finished => RecordingFinished,
196 on_recording_started => RecordingStarted,
197 on_stasis_end => StasisEnd,
198 on_stasis_start => StasisStart,
199 on_text_message_received => TextMessageReceived
200);