sos_net/account/listen.rs
1//! Adds functions for listening to change notifications using
2//! a websocket connection.
3use crate::{Error, NetworkAccount, Result};
4use sos_core::Origin;
5use sos_protocol::{
6 network_client::ListenOptions, NetworkChangeEvent, RemoteResult,
7 RemoteSync,
8};
9use sos_sync::SyncStorage;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12
13impl NetworkAccount {
14 /// Close all the websocket connections.
15 #[cfg(feature = "listen")]
16 pub(super) async fn shutdown_websockets(&self) {
17 tracing::debug!("listen::close_all_websockets");
18
19 let mut listeners = self.listeners.lock().await;
20 for (_, handle) in listeners.drain() {
21 handle.close().await;
22 }
23 }
24
25 /// Stop listening to a server websocket.
26 pub async fn stop_listening(&self, origin: &Origin) {
27 let mut listeners = self.listeners.lock().await;
28 if let Some(handle) = listeners.get(origin) {
29 tracing::debug!(
30 url = %origin.url(),
31 "listen::close_websocket");
32
33 handle.close().await;
34 listeners.remove(origin);
35 }
36 }
37 /// Listen for changes on a server websocket.
38 pub async fn listen(
39 &self,
40 origin: &Origin,
41 options: ListenOptions,
42 listener: Option<
43 mpsc::Sender<(NetworkChangeEvent, RemoteResult<crate::Error>)>,
44 >,
45 ) -> Result<()> {
46 let remotes = self.remotes.read().await;
47 if let Some(remote) = remotes.get(origin) {
48 self.stop_listening(&origin).await;
49
50 let remote = Arc::new(remote.clone());
51 let (tx, mut rx) = mpsc::channel::<NetworkChangeEvent>(32);
52
53 let local_account = Arc::clone(&self.account);
54 let sync_lock = Arc::clone(&self.sync_lock);
55 let sync_remote = Arc::clone(&remote);
56
57 tokio::task::spawn(async move {
58 while let Some(message) = rx.recv().await {
59 // If the change notification has changes
60 // then we attempt to sync with the remote
61 if message.outcome().changes > 0 {
62 // When multiple servers are configured and we
63 // are listening for notifications to multiple
64 // servers then this will fire for each server
65 // however it's likely the same change set is
66 // being applied to all servers. By comparing
67 // the cumulative root hashes against our local
68 // status we can drop change notifications that
69 // would not make any changes which will reduce
70 // network traffic and prevent multiple re-renders
71 // in the UI.
72 let differs = {
73 let account = local_account.lock().await;
74 let local_status = account.sync_status().await?;
75 &local_status.root != message.root()
76 };
77
78 if differs {
79 // Ensure we acquire the sync lock
80 // to prevent other changes to the storage
81 let _ = sync_lock.lock().await;
82
83 // Sync with the remote that notified us
84 let sync_result = sync_remote.sync().await;
85 if let Err(e) = &sync_result.result {
86 tracing::error!(
87 error = ?e,
88 "listen_sync",
89 );
90 }
91
92 // If we have a listener notify them with the
93 // change notification and a possible sync error
94 let tx = listener.clone();
95 if let Some(tx) = tx {
96 let _ = tx.send((message, sync_result)).await;
97 }
98 } else {
99 tracing::debug!(
100 root = %message.root(),
101 "drop_change_notification",
102 );
103 }
104 }
105 }
106
107 Ok::<(), Error>(())
108 });
109
110 let handle = remote.listen(options, tx);
111
112 // Store the listeners so we can
113 // close the connections on sign out
114 {
115 let mut listeners = self.listeners.lock().await;
116 listeners.insert(origin.clone(), handle);
117 }
118
119 Ok(())
120 } else {
121 Err(Error::OriginNotFound(origin.clone()))
122 }
123 }
124}