sos_net/account/
listen.rs

1//! Adds functions for listening to change notifications using
2//! a websocket connection.
3use crate::{Error, NetworkAccount, Result};
4use sos_core::Origin;
5use sos_protocol::{
6    network_client::ListenOptions, NetworkChangeEvent, RemoteResult,
7    RemoteSync,
8};
9use sos_sync::SyncStorage;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12
13impl NetworkAccount {
14    /// Close all the websocket connections.
15    #[cfg(feature = "listen")]
16    pub(super) async fn shutdown_websockets(&self) {
17        tracing::debug!("listen::close_all_websockets");
18
19        let mut listeners = self.listeners.lock().await;
20        for (_, handle) in listeners.drain() {
21            handle.close().await;
22        }
23    }
24
25    /// Stop listening to a server websocket.
26    pub async fn stop_listening(&self, origin: &Origin) {
27        let mut listeners = self.listeners.lock().await;
28        if let Some(handle) = listeners.get(origin) {
29            tracing::debug!(
30                url = %origin.url(),
31                "listen::close_websocket");
32
33            handle.close().await;
34            listeners.remove(origin);
35        }
36    }
37    /// Listen for changes on a server websocket.
38    pub async fn listen(
39        &self,
40        origin: &Origin,
41        options: ListenOptions,
42        listener: Option<
43            mpsc::Sender<(NetworkChangeEvent, RemoteResult<crate::Error>)>,
44        >,
45    ) -> Result<()> {
46        let remotes = self.remotes.read().await;
47        if let Some(remote) = remotes.get(origin) {
48            self.stop_listening(&origin).await;
49
50            let remote = Arc::new(remote.clone());
51            let (tx, mut rx) = mpsc::channel::<NetworkChangeEvent>(32);
52
53            let local_account = Arc::clone(&self.account);
54            let sync_lock = Arc::clone(&self.sync_lock);
55            let sync_remote = Arc::clone(&remote);
56
57            tokio::task::spawn(async move {
58                while let Some(message) = rx.recv().await {
59                    // If the change notification has changes
60                    // then we attempt to sync with the remote
61                    if message.outcome().changes > 0 {
62                        // When multiple servers are configured and we
63                        // are listening for notifications to multiple
64                        // servers then this will fire for each server
65                        // however it's likely the same change set is
66                        // being applied to all servers. By comparing
67                        // the cumulative root hashes against our local
68                        // status we can drop change notifications that
69                        // would not make any changes which will reduce
70                        // network traffic and prevent multiple re-renders
71                        // in the UI.
72                        let differs = {
73                            let account = local_account.lock().await;
74                            let local_status = account.sync_status().await?;
75                            &local_status.root != message.root()
76                        };
77
78                        if differs {
79                            // Ensure we acquire the sync lock
80                            // to prevent other changes to the storage
81                            let _ = sync_lock.lock().await;
82
83                            // Sync with the remote that notified us
84                            let sync_result = sync_remote.sync().await;
85                            if let Err(e) = &sync_result.result {
86                                tracing::error!(
87                                    error = ?e,
88                                    "listen_sync",
89                                );
90                            }
91
92                            // If we have a listener notify them with the
93                            // change notification and a possible sync error
94                            let tx = listener.clone();
95                            if let Some(tx) = tx {
96                                let _ = tx.send((message, sync_result)).await;
97                            }
98                        } else {
99                            tracing::debug!(
100                              root = %message.root(),
101                              "drop_change_notification",
102                            );
103                        }
104                    }
105                }
106
107                Ok::<(), Error>(())
108            });
109
110            let handle = remote.listen(options, tx);
111
112            // Store the listeners so we can
113            // close the connections on sign out
114            {
115                let mut listeners = self.listeners.lock().await;
116                listeners.insert(origin.clone(), handle);
117            }
118
119            Ok(())
120        } else {
121            Err(Error::OriginNotFound(origin.clone()))
122        }
123    }
124}