1use crate::{apis, ws};
2use futures_util::StreamExt;
3use std::collections::HashMap;
4use std::future::Future;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::sync::{Arc, RwLock};
8use tracing::debug;
9
10type Handler = Arc<
11 dyn Fn(Arc<apis::client::Client>, ws::models::Event) -> Pin<Box<dyn Future<Output = ()> + Send>>
12 + Send
13 + Sync,
14>;
15
16#[derive(Clone)]
19pub struct AriClient {
20 client: Arc<apis::client::Client>,
21 ws: Arc<tokio::sync::Mutex<ws::client::Client>>,
22 event_handlers: Arc<RwLock<HashMap<String, Handler>>>,
23}
24
25impl AriClient {
26 pub fn with_config(config: crate::config::Config) -> Self {
28 AriClient {
29 client: Arc::new(apis::client::Client::with_config(config.clone())),
30 ws: Arc::new(tokio::sync::Mutex::new(ws::client::Client::with_config(
31 config,
32 ))),
33 event_handlers: Arc::new(RwLock::new(HashMap::new())),
34 }
35 }
36
37 pub fn on_unknown_event<F, Fut>(&mut self, handler: F) -> &mut Self
39 where
40 F: Fn(Arc<apis::client::Client>, ws::models::Event) -> Fut + Send + Sync + 'static,
41 Fut: Future<Output = ()> + Send + 'static,
42 {
43 self.event_handlers.write().unwrap().insert(
44 "Unknown".to_string(),
45 Arc::new(move |client, event| Box::pin(handler(client, event))),
46 );
47 self
48 }
49
50 pub fn on_event<F, Fut>(&mut self, key: impl Into<String>, handler: F) -> &mut Self
52 where
53 F: Fn(Arc<apis::client::Client>, ws::models::Event) -> Fut + Send + Sync + 'static,
54 Fut: Future<Output = ()> + Send + 'static,
55 {
56 self.event_handlers.write().unwrap().insert(
57 key.into(),
58 Arc::new(move |client, event| Box::pin(handler(client, event))),
59 );
60 self
61 }
62
63 pub async fn start(
65 &mut self,
66 application_name: impl Into<String>,
67 ) -> crate::errors::Result<()> {
68 let mut stream = self
69 .ws
70 .lock()
71 .await
72 .connect(ws::params::ListenRequest::new(application_name))
73 .await?;
74
75 let event_handlers = self.event_handlers.clone();
76 let client = self.client.clone();
77 tokio::task::spawn(async move {
78 while let Some(event) = stream.next().await {
79 let maybe_handler = {
81 let handlers = event_handlers.read().unwrap();
82 handlers.get(&event.to_string()).cloned()
83 };
84
85 if let Some(handler) = maybe_handler {
86 handler(client.clone(), event).await;
87 } else {
88 debug!(
89 "No handler registered for event type: {}",
90 event.to_string()
91 );
92 }
93 }
94 });
95
96 Ok(())
97 }
98
99 pub async fn stop(&mut self) -> Result<(), crate::errors::AriError> {
101 self.ws.lock().await.disconnect().await
102 }
103
104 pub fn client(&self) -> &apis::client::Client {
106 &self.client
107 }
108}
109
110impl Deref for AriClient {
111 type Target = apis::client::Client;
112 fn deref(&self) -> &Self::Target {
113 &self.client
114 }
115}
116
117macro_rules! create_event_handler {
119 ($($event_name:ident => $event_variant:ident),*) => {
120 impl AriClient {
121 $(
122 pub fn $event_name<F, Fut>(&mut self, handler: F) -> &mut Self
124 where
125 F: Fn(Arc<apis::client::Client>, ws::models::BaseEvent<ws::models::$event_variant>) -> Fut
126 + Send
127 + Sync
128 + 'static,
129 Fut: Future<Output = ()> + Send + 'static,
130 {
131 let handler = Arc::new(handler);
132 self.on_event(stringify!($event_variant).to_string(), move |client, event| {
133 let handler = handler.clone();
134 async move {
135 if let ws::models::Event::$event_variant(e) = event {
136 handler(client, e).await;
137 } else {
138 unreachable!();
139 }
140 }
141 });
142 self
143 }
144 )*
145 }
146 };
147}
148
149create_event_handler!(
150 on_application_move_failed => ApplicationMoveFailed,
151 on_application_replaced => ApplicationReplaced,
152 on_bridge_attended_transfer => BridgeAttendedTransfer,
153 on_bridge_blind_transfer => BridgeBlindTransfer,
154 on_bridge_created => BridgeCreated,
155 on_bridge_destroyed => BridgeDestroyed,
156 on_bridge_merged => BridgeMerged,
157 on_bridge_video_source_changed => BridgeVideoSourceChanged,
158 on_channel_caller_id => ChannelCallerId,
159 on_channel_connected_line => ChannelConnectedLine,
160 on_channel_created => ChannelCreated,
161 on_channel_destroyed => ChannelDestroyed,
162 on_channel_dialplan => ChannelDialplan,
163 on_channel_dtmf_received => ChannelDtmfReceived,
164 on_channel_entered_bridge => ChannelEnteredBridge,
165 on_channel_hangup_request => ChannelHangupRequest,
166 on_channel_hold => ChannelHold,
167 on_channel_left_bridge => ChannelLeftBridge,
168 on_channel_state_change => ChannelStateChange,
169 on_channel_talking_finished => ChannelTalkingFinished,
170 on_channel_talking_started => ChannelTalkingStarted,
171 on_channel_tone_detected => ChannelToneDetected,
172 on_channel_unhold => ChannelUnhold,
173 on_channel_user_event => ChannelUserEvent,
174 on_channel_var_set => ChannelVarSet,
175 on_contact_info => ContactInfo,
176 on_contact_status_change => ContactStatusChange,
177 on_device_state_changed => DeviceStateChanged,
178 on_dial => Dial,
179 on_endpoint_state_change => EndpointStateChange,
180 on_missing_params => MissingParams,
181 on_peer => Peer,
182 on_peer_status_change => PeerStatusChange,
183 on_playback_continuing => PlaybackContinuing,
184 on_playback_finished => PlaybackFinished,
185 on_playback_started => PlaybackStarted,
186 on_recording_failed => RecordingFailed,
187 on_recording_finished => RecordingFinished,
188 on_recording_started => RecordingStarted,
189 on_stasis_end => StasisEnd,
190 on_stasis_start => StasisStart,
191 on_text_message_received => TextMessageReceived
192);