ring_client/client/api/location/
event.rs

1use crate::helper::url::Url;
2use crate::location::Location;
3use crate::{helper, ApiError};
4use futures_util::stream::SplitStream;
5use futures_util::{stream::SplitSink, SinkExt, StreamExt};
6use serde::{Deserialize, Serialize};
7use std::cmp::PartialEq;
8use std::future::Future;
9use std::sync::Arc;
10use tokio::net::TcpStream;
11use tokio::sync::Mutex;
12use tokio_tungstenite::tungstenite::client::IntoClientRequest;
13use tokio_tungstenite::tungstenite::Utf8Bytes;
14use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
15
16/// A real-time event which occured in a Location.
17#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
18pub struct Event {
19    /// The content of the event.
20    #[serde(rename = "msg")]
21    pub message: Message,
22}
23
24impl Event {
25    /// Create a new event with the given message.
26    #[must_use]
27    pub const fn new(message: Message) -> Self {
28        Self { message }
29    }
30}
31
32/// A message sent to or from Ring via WebSocket.
33#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
34#[serde(tag = "msg")]
35#[allow(missing_docs)]
36#[non_exhaustive]
37pub enum Message {
38    SubscriptionTopicsInfo(serde_json::Value),
39
40    DeviceInfoSet(serde_json::Value),
41
42    SessionInfo(serde_json::Value),
43
44    DataUpdate(serde_json::Value),
45
46    /// A message which is yet to be mapped by the crate.
47    #[serde(other)]
48    Unknown,
49}
50
51impl TryFrom<Event> for tungstenite::protocol::Message {
52    type Error = serde_json::Error;
53
54    fn try_from(event: Event) -> Result<Self, Self::Error> {
55        Ok(Self::Text(Utf8Bytes::from(&serde_json::to_string(&event)?)))
56    }
57}
58
59/// A live connection for exchanging messages with Ring.
60///
61/// For example, to enable an Alarm system.
62#[derive(Debug)]
63pub struct Connection {
64    /// The read portion of the WebSocket stream.
65    stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
66
67    /// The write portion of the WebSocket stream.
68    sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
69}
70
71impl Connection {
72    #[must_use]
73    pub(crate) fn new(stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
74        let (sink, stream) = stream.split();
75
76        Self { stream, sink }
77    }
78
79    /// Reads the next message from the stream.
80    #[must_use]
81    pub async fn next(&mut self) -> Option<Result<Event, ApiError>> {
82        while let Some(message) = self.stream.next().await {
83            match message {
84                Ok(message) => {
85                    if let tungstenite::protocol::Message::Ping(_) = message {
86                        // We can safetly ignore ping messages as Tungstenite will
87                        // handle the Pong response for us.
88                        //
89                        // https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocket.html#method.write
90                        log::debug!("Recieved ping message from Ring");
91
92                        continue;
93                    }
94
95                    let event = serde_json::from_str::<Event>(&message.to_string())
96                        .map_err(ApiError::InvalidResponse);
97
98                    if let Err(error) = event {
99                        log::error!("Error deserializing message: {:?}", error);
100
101                        return Some(Err(error));
102                    }
103
104                    log::debug!("Received event: {:?}", event);
105
106                    return Some(event);
107                }
108                Err(error) => {
109                    log::error!("Error receiving message: {:?}", error);
110
111                    return Some(Err(ApiError::WebsocketError(error)));
112                }
113            }
114        }
115
116        None
117    }
118
119    /// Sends a message to Ring immediately (no buffering).
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the sink has already been closed.
124    pub async fn send(&mut self, event: Event) -> Result<(), ApiError> {
125        self.sink
126            .send(event.try_into()?)
127            .await
128            .map_err(ApiError::WebsocketError)
129    }
130
131    /// Closes the connection to Ring gracefully.
132    pub async fn close(self) {
133        let stream = self.stream.reunite(self.sink);
134
135        match stream {
136            Ok(mut stream) => {
137                let closed = stream.close(None).await;
138
139                if let Err(error) = closed {
140                    log::error!("Error closing stream: {:?}", error);
141                    return;
142                }
143
144                log::info!("Shut down Websocket connection gracefully");
145            }
146            Err(_) => {
147                log::info!("Unable to reunite write and read pair into stream");
148            }
149        }
150    }
151}
152
153/// An event listener for a Location.
154#[derive(Debug)]
155pub struct Listener<'a> {
156    location: &'a Location<'a>,
157    connection: Connection,
158}
159
160impl<'a> Listener<'a> {
161    /// Create a brand new event listener for a location.
162    ///
163    /// This generally accepts a callback defined by the caller, which is triggered whenever an
164    /// event is triggered by Ring.
165    #[must_use]
166    pub fn new<'b>(
167        location: &'b Location<'_>,
168        stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
169    ) -> Listener<'b> {
170        Listener {
171            location,
172            connection: Connection::new(stream),
173        }
174    }
175
176    /// Listen for events in a particular location.
177    ///
178    /// # Example
179    ///
180    /// ```no_run
181    /// use ring_client::Client;
182    ///
183    /// use ring_client::authentication::Credentials;
184    /// use ring_client::OperatingSystem;
185    ///
186    /// # tokio_test::block_on(async {
187    /// let client = Client::new("Home Automation", "mock-system-id", OperatingSystem::Ios);
188    ///
189    /// // For brevity, a Refresh Token is being used here. However, the client can also
190    /// // be authenticated using a username and password.
191    /// //
192    /// // See `Client::login` for more information.
193    /// let refresh_token = Credentials::RefreshToken("".to_string());
194    ///
195    /// client.login(refresh_token)
196    ///      .await
197    ///      .expect("Logging in with a valid refresh token should not fail");
198    ///
199    /// let locations = client.get_locations()
200    ///      .await
201    ///      .expect("Getting locations should not fail");
202    ///
203    /// let location = locations
204    ///      .first()
205    ///      .expect("There should be at least one location");
206    ///
207    /// let mut listener = location.get_listener()
208    ///      .await
209    ///      .expect("Creating a listener should not fail");
210    ///
211    /// // Listen for events in the location and react to them using the provided closure.
212    /// listener.listen::<_, _, ()>(|event, location, mut connection| async move {
213    ///     // Connection can be used to send commands to the Ring API.
214    ///     println!("New event: {:#?}", event);
215    ///
216    ///     // The connection argument can be used to send events back to Ring in
217    ///     // response to the event.
218    ///
219    ///     // Return true or false to indicate whether the listener should continue listening
220    ///     Ok(true)
221    /// })
222    /// .await;
223    ///
224    /// # });
225    ///```
226    ///
227    /// # Errors
228    ///
229    /// Returns the error from the event handler if it returns an error when called.
230    pub async fn listen<EventHandler, EventHandlerFut, E>(
231        &'a mut self,
232        on_event: EventHandler,
233    ) -> Result<(), E>
234    where
235        EventHandler:
236            Fn(Event, &'a Location<'a>, Arc<Mutex<&'a mut Connection>>) -> EventHandlerFut,
237        EventHandlerFut: Future<Output = Result<bool, E>>,
238    {
239        let connection = Arc::new(Mutex::new(&mut self.connection));
240
241        loop {
242            // Wait for the next event from the connection and then drop the lock
243            // to allow any on_event calls to use the connection without blocking
244            let event = { connection.lock().await.next().await };
245
246            match event {
247                Some(Ok(event)) => {
248                    if event.message == Message::Unknown {
249                        log::warn!("Unknown message received: {:?}", event.message);
250                        continue;
251                    }
252
253                    log::debug!("Received event: {:?}", event);
254
255                    let outcome = on_event(event, self.location, Arc::clone(&connection)).await?;
256
257                    if !outcome {
258                        log::debug!("Event handler returned false, stopping listener");
259                        break;
260                    }
261                }
262                Some(Err(error)) => {
263                    log::error!("Error receiving event: {:?}", error);
264                }
265                None => {
266                    log::info!("Websocket stream closed, stopping listener");
267                    break;
268                }
269            }
270        }
271
272        Ok(())
273    }
274
275    /// Send an event to Ring.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the connection is closed.
280    ///
281    /// # Example
282    ///
283    /// ```no_run
284    /// use serde_json::json;
285    /// use ring_client::Client;
286    ///
287    /// use ring_client::authentication::Credentials;
288    /// use ring_client::location::{Event, Message};
289    /// use ring_client::OperatingSystem;
290    ///
291    /// # tokio_test::block_on(async {
292    /// let client = Client::new("Home Automation", "mock-system-id", OperatingSystem::Ios);
293    ///
294    /// // For brevity, a Refresh Token is being used here. However, the client can also
295    /// // be authenticated using a username and password.
296    /// //
297    /// // See `Client::login` for more information.
298    /// let refresh_token = Credentials::RefreshToken("".to_string());
299    ///
300    /// client.login(refresh_token)
301    ///      .await
302    ///      .expect("Logging in with a valid refresh token should not fail");
303    ///
304    /// let locations = client.get_locations()
305    ///      .await
306    ///      .expect("Getting locations should not fail");
307    ///
308    /// let location = locations
309    ///      .first()
310    ///      .expect("There should be at least one location");
311    ///
312    /// let listener = location.get_listener().await;
313    ///
314    /// location.get_listener()
315    ///     .await
316    ///     .expect("Creating a listener should not fail")
317    ///     .send(
318    ///         Event::new(
319    ///             Message::DataUpdate(json!({}))
320    ///         )
321    ///     )
322    ///     .await
323    ///     .expect("Sending an event should not fail");
324    /// # });
325    ///```
326    pub async fn send(&mut self, event: Event) -> Result<(), ApiError> {
327        self.connection.send(event).await
328    }
329
330    /// Close the underlying connection to Ring.
331    pub async fn close(self) {
332        self.connection.close().await;
333    }
334}
335
336impl<'a> Location<'a> {
337    /// Get a listener for events in a location.
338    ///
339    /// # Errors
340    ///
341    /// Will return an error if a connection cannot be established with Ring.
342    pub async fn get_listener(&'a self) -> Result<Listener<'a>, ApiError> {
343        let (stream, _) = self.connect().await?;
344
345        Ok(Listener::new(self, stream))
346    }
347
348    /// Generate a ticket (credentials and URI for a Ring Websocket server) and connect to it.
349    async fn connect(
350        &self,
351    ) -> Result<
352        (
353            WebSocketStream<MaybeTlsStream<TcpStream>>,
354            tungstenite::handshake::client::Response,
355        ),
356        ApiError,
357    > {
358        let ticket = self.session.get_ticket(&self.data.id).await?;
359
360        let request = helper::url::get_base_url(&Url::Websocket {
361            host: &ticket.host,
362            auth_code: &ticket.id,
363        })
364        .into_client_request()?;
365
366        Ok(connect_async(request).await?)
367    }
368}