ring_client/client/api/location/event.rs
1use crate::helper::url::Url;
2use crate::location::Location;
3use crate::{helper, ApiError};
4use futures_util::stream::SplitStream;
5use futures_util::{stream::SplitSink, SinkExt, StreamExt};
6use serde::{Deserialize, Serialize};
7use std::cmp::PartialEq;
8use std::future::Future;
9use std::sync::Arc;
10use tokio::net::TcpStream;
11use tokio::sync::Mutex;
12use tokio_tungstenite::tungstenite::client::IntoClientRequest;
13use tokio_tungstenite::tungstenite::Utf8Bytes;
14use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
15
16/// A real-time event which occured in a Location.
17#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
18pub struct Event {
19 /// The content of the event.
20 #[serde(rename = "msg")]
21 pub message: Message,
22}
23
24impl Event {
25 /// Create a new event with the given message.
26 #[must_use]
27 pub const fn new(message: Message) -> Self {
28 Self { message }
29 }
30}
31
32/// A message sent to or from Ring via WebSocket.
33#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
34#[serde(tag = "msg")]
35#[allow(missing_docs)]
36#[non_exhaustive]
37pub enum Message {
38 SubscriptionTopicsInfo(serde_json::Value),
39
40 DeviceInfoSet(serde_json::Value),
41
42 SessionInfo(serde_json::Value),
43
44 DataUpdate(serde_json::Value),
45
46 /// A message which is yet to be mapped by the crate.
47 #[serde(other)]
48 Unknown,
49}
50
51impl TryFrom<Event> for tungstenite::protocol::Message {
52 type Error = serde_json::Error;
53
54 fn try_from(event: Event) -> Result<Self, Self::Error> {
55 Ok(Self::Text(Utf8Bytes::from(&serde_json::to_string(&event)?)))
56 }
57}
58
59/// A live connection for exchanging messages with Ring.
60///
61/// For example, to enable an Alarm system.
62#[derive(Debug)]
63pub struct Connection {
64 /// The read portion of the WebSocket stream.
65 stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
66
67 /// The write portion of the WebSocket stream.
68 sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
69}
70
71impl Connection {
72 #[must_use]
73 pub(crate) fn new(stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
74 let (sink, stream) = stream.split();
75
76 Self { stream, sink }
77 }
78
79 /// Reads the next message from the stream.
80 #[must_use]
81 pub async fn next(&mut self) -> Option<Result<Event, ApiError>> {
82 while let Some(message) = self.stream.next().await {
83 match message {
84 Ok(message) => {
85 if let tungstenite::protocol::Message::Ping(_) = message {
86 // We can safetly ignore ping messages as Tungstenite will
87 // handle the Pong response for us.
88 //
89 // https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocket.html#method.write
90 log::debug!("Recieved ping message from Ring");
91
92 continue;
93 }
94
95 let event = serde_json::from_str::<Event>(&message.to_string())
96 .map_err(ApiError::InvalidResponse);
97
98 if let Err(error) = event {
99 log::error!("Error deserializing message: {:?}", error);
100
101 return Some(Err(error));
102 }
103
104 log::debug!("Received event: {:?}", event);
105
106 return Some(event);
107 }
108 Err(error) => {
109 log::error!("Error receiving message: {:?}", error);
110
111 return Some(Err(ApiError::WebsocketError(error)));
112 }
113 }
114 }
115
116 None
117 }
118
119 /// Sends a message to Ring immediately (no buffering).
120 ///
121 /// # Errors
122 ///
123 /// Returns an error if the sink has already been closed.
124 pub async fn send(&mut self, event: Event) -> Result<(), ApiError> {
125 self.sink
126 .send(event.try_into()?)
127 .await
128 .map_err(ApiError::WebsocketError)
129 }
130
131 /// Closes the connection to Ring gracefully.
132 pub async fn close(self) {
133 let stream = self.stream.reunite(self.sink);
134
135 match stream {
136 Ok(mut stream) => {
137 let closed = stream.close(None).await;
138
139 if let Err(error) = closed {
140 log::error!("Error closing stream: {:?}", error);
141 return;
142 }
143
144 log::info!("Shut down Websocket connection gracefully");
145 }
146 Err(_) => {
147 log::info!("Unable to reunite write and read pair into stream");
148 }
149 }
150 }
151}
152
153/// An event listener for a Location.
154#[derive(Debug)]
155pub struct Listener<'a> {
156 location: &'a Location<'a>,
157 connection: Connection,
158}
159
160impl<'a> Listener<'a> {
161 /// Create a brand new event listener for a location.
162 ///
163 /// This generally accepts a callback defined by the caller, which is triggered whenever an
164 /// event is triggered by Ring.
165 #[must_use]
166 pub fn new<'b>(
167 location: &'b Location<'_>,
168 stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
169 ) -> Listener<'b> {
170 Listener {
171 location,
172 connection: Connection::new(stream),
173 }
174 }
175
176 /// Listen for events in a particular location.
177 ///
178 /// # Example
179 ///
180 /// ```no_run
181 /// use ring_client::Client;
182 ///
183 /// use ring_client::authentication::Credentials;
184 /// use ring_client::OperatingSystem;
185 ///
186 /// # tokio_test::block_on(async {
187 /// let client = Client::new("Home Automation", "mock-system-id", OperatingSystem::Ios);
188 ///
189 /// // For brevity, a Refresh Token is being used here. However, the client can also
190 /// // be authenticated using a username and password.
191 /// //
192 /// // See `Client::login` for more information.
193 /// let refresh_token = Credentials::RefreshToken("".to_string());
194 ///
195 /// client.login(refresh_token)
196 /// .await
197 /// .expect("Logging in with a valid refresh token should not fail");
198 ///
199 /// let locations = client.get_locations()
200 /// .await
201 /// .expect("Getting locations should not fail");
202 ///
203 /// let location = locations
204 /// .first()
205 /// .expect("There should be at least one location");
206 ///
207 /// let mut listener = location.get_listener()
208 /// .await
209 /// .expect("Creating a listener should not fail");
210 ///
211 /// // Listen for events in the location and react to them using the provided closure.
212 /// listener.listen::<_, _, ()>(|event, location, mut connection| async move {
213 /// // Connection can be used to send commands to the Ring API.
214 /// println!("New event: {:#?}", event);
215 ///
216 /// // The connection argument can be used to send events back to Ring in
217 /// // response to the event.
218 ///
219 /// // Return true or false to indicate whether the listener should continue listening
220 /// Ok(true)
221 /// })
222 /// .await;
223 ///
224 /// # });
225 ///```
226 ///
227 /// # Errors
228 ///
229 /// Returns the error from the event handler if it returns an error when called.
230 pub async fn listen<EventHandler, EventHandlerFut, E>(
231 &'a mut self,
232 on_event: EventHandler,
233 ) -> Result<(), E>
234 where
235 EventHandler:
236 Fn(Event, &'a Location<'a>, Arc<Mutex<&'a mut Connection>>) -> EventHandlerFut,
237 EventHandlerFut: Future<Output = Result<bool, E>>,
238 {
239 let connection = Arc::new(Mutex::new(&mut self.connection));
240
241 loop {
242 // Wait for the next event from the connection and then drop the lock
243 // to allow any on_event calls to use the connection without blocking
244 let event = { connection.lock().await.next().await };
245
246 match event {
247 Some(Ok(event)) => {
248 if event.message == Message::Unknown {
249 log::warn!("Unknown message received: {:?}", event.message);
250 continue;
251 }
252
253 log::debug!("Received event: {:?}", event);
254
255 let outcome = on_event(event, self.location, Arc::clone(&connection)).await?;
256
257 if !outcome {
258 log::debug!("Event handler returned false, stopping listener");
259 break;
260 }
261 }
262 Some(Err(error)) => {
263 log::error!("Error receiving event: {:?}", error);
264 }
265 None => {
266 log::info!("Websocket stream closed, stopping listener");
267 break;
268 }
269 }
270 }
271
272 Ok(())
273 }
274
275 /// Send an event to Ring.
276 ///
277 /// # Errors
278 ///
279 /// Returns an error if the connection is closed.
280 ///
281 /// # Example
282 ///
283 /// ```no_run
284 /// use serde_json::json;
285 /// use ring_client::Client;
286 ///
287 /// use ring_client::authentication::Credentials;
288 /// use ring_client::location::{Event, Message};
289 /// use ring_client::OperatingSystem;
290 ///
291 /// # tokio_test::block_on(async {
292 /// let client = Client::new("Home Automation", "mock-system-id", OperatingSystem::Ios);
293 ///
294 /// // For brevity, a Refresh Token is being used here. However, the client can also
295 /// // be authenticated using a username and password.
296 /// //
297 /// // See `Client::login` for more information.
298 /// let refresh_token = Credentials::RefreshToken("".to_string());
299 ///
300 /// client.login(refresh_token)
301 /// .await
302 /// .expect("Logging in with a valid refresh token should not fail");
303 ///
304 /// let locations = client.get_locations()
305 /// .await
306 /// .expect("Getting locations should not fail");
307 ///
308 /// let location = locations
309 /// .first()
310 /// .expect("There should be at least one location");
311 ///
312 /// let listener = location.get_listener().await;
313 ///
314 /// location.get_listener()
315 /// .await
316 /// .expect("Creating a listener should not fail")
317 /// .send(
318 /// Event::new(
319 /// Message::DataUpdate(json!({}))
320 /// )
321 /// )
322 /// .await
323 /// .expect("Sending an event should not fail");
324 /// # });
325 ///```
326 pub async fn send(&mut self, event: Event) -> Result<(), ApiError> {
327 self.connection.send(event).await
328 }
329
330 /// Close the underlying connection to Ring.
331 pub async fn close(self) {
332 self.connection.close().await;
333 }
334}
335
336impl<'a> Location<'a> {
337 /// Get a listener for events in a location.
338 ///
339 /// # Errors
340 ///
341 /// Will return an error if a connection cannot be established with Ring.
342 pub async fn get_listener(&'a self) -> Result<Listener<'a>, ApiError> {
343 let (stream, _) = self.connect().await?;
344
345 Ok(Listener::new(self, stream))
346 }
347
348 /// Generate a ticket (credentials and URI for a Ring Websocket server) and connect to it.
349 async fn connect(
350 &self,
351 ) -> Result<
352 (
353 WebSocketStream<MaybeTlsStream<TcpStream>>,
354 tungstenite::handshake::client::Response,
355 ),
356 ApiError,
357 > {
358 let ticket = self.session.get_ticket(&self.data.id).await?;
359
360 let request = helper::url::get_base_url(&Url::Websocket {
361 host: &ticket.host,
362 auth_code: &ticket.id,
363 })
364 .into_client_request()?;
365
366 Ok(connect_async(request).await?)
367 }
368}