Skip to main content

lb_rs/subscribers/
status.rs

1use basic_human_duration::ChronoHumanDuration;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use time::Duration;
5
6use tokio::sync::{Mutex, RwLock};
7use uuid::Uuid;
8use web_time::Instant;
9
10use crate::model::clock;
11use crate::model::errors::{LbErrKind, LbResult, Unexpected};
12use crate::service::events::{Event, SyncIncrement};
13use crate::service::usage::UsageMetrics;
14use crate::{LocalLb, tokio_spawn};
15
16#[derive(Clone, Default)]
17pub struct StatusUpdater {
18    current_status: Arc<RwLock<Status>>,
19    space_updated: Arc<Mutex<SpaceUpdater>>,
20}
21
22/// rate limit get_usage calls to once every 60 seconds or so
23pub struct SpaceUpdater {
24    initialized: bool,
25    spawned: bool,
26    last_computed: Instant,
27}
28
29/// lb-rs may be used by multiple disconnected components which may
30/// not be able to seamlessly share state among one another. this struct
31/// provides a snapshot into what overall state of data and tasks are
32/// within lb-rs.
33///
34/// the fields are roughly in order of priority, if your UI has limited
35/// space to represent information (phones?) earlier fields are more
36/// important than later fields. Ideally anything with an ID is represented
37/// in the file tree itself.
38#[derive(Default, Clone, Debug, Serialize, Deserialize)]
39pub struct Status {
40    /// some recent server interaction failed due to network conditions
41    pub offline: bool,
42
43    /// a sync is in progress
44    pub syncing: bool,
45
46    /// at-least one document cannot be pushed due to a data cap
47    pub out_of_space: bool,
48
49    /// there are pending shares
50    pub pending_shares: bool,
51
52    /// you must update to be able to sync, see update_available below
53    pub update_required: bool,
54
55    /// metadata or content for this id is being sent to the server
56    pub pushing_files: Vec<Uuid>,
57
58    /// following files need to be pushed
59    pub dirty_locally: Vec<Uuid>,
60
61    /// metadata or content for this id is being from the server
62    /// callers should be prepared to handle ids they don't know
63    /// about yet.
64    pub pulling_files: Vec<Uuid>,
65
66    /// a mix of human readable and precise data for
67    /// used, and available space
68    pub space_used: Option<UsageMetrics>,
69
70    /// if there is no pending work this will have a human readable
71    /// description of when we last synced successfully
72    pub sync_status: Option<String>,
73
74    pub unexpected_sync_problem: Option<String>,
75}
76
77impl Status {
78    pub fn msg(&self) -> Option<String> {
79        if self.syncing {
80            return Some("Syncing...".to_string());
81        }
82
83        if self.offline {
84            if !self.dirty_locally.is_empty() {
85                let len = self.dirty_locally.len();
86                return Some(format!(
87                    "Offline, {} change{} unsynced.",
88                    len,
89                    if len > 1 { "s" } else { "" }
90                ));
91            }
92
93            if let Some(last_synced) = &self.sync_status {
94                return Some(format!("Offline, last synced: {last_synced}"));
95            }
96
97            return Some("Offline.".to_string());
98        }
99
100        if self.out_of_space {
101            return Some("You're out of space!".to_string());
102        }
103
104        if self.update_required {
105            return Some("An update is required to continue.".to_string());
106        }
107
108        if let Some(err) = &self.unexpected_sync_problem {
109            return Some(err.to_string());
110        }
111
112        if !self.dirty_locally.is_empty() {
113            let dirty_locally = self.dirty_locally.len();
114            return Some(format!("{dirty_locally} changes unsynced"));
115        }
116
117        if let Some(last_synced) = &self.sync_status {
118            return Some(format!("Last synced: {last_synced}"));
119        }
120
121        None
122    }
123}
124
125impl LocalLb {
126    pub async fn status(&self) -> Status {
127        self.status.current_status.read().await.clone()
128    }
129
130    pub async fn set_initial_state(&self) -> LbResult<()> {
131        if self.keychain.get_account().is_ok() {
132            self.spawn_compute_usage().await;
133            let mut current = self.status.current_status.write().await;
134            current.dirty_locally = self.local_changes().await;
135            if current.dirty_locally.is_empty() {
136                current.sync_status = self.get_last_synced_human().await.log_and_ignore();
137            }
138            current.pending_shares = !self.get_pending_shares().await?.is_empty();
139        }
140
141        Ok(())
142    }
143
144    pub async fn setup_status(&self) -> LbResult<()> {
145        self.set_initial_state().await?;
146        let mut rx = self.subscribe();
147        let bg = self.clone();
148
149        tokio_spawn!(async move {
150            loop {
151                let evt = match rx.recv().await {
152                    Ok(evt) => evt,
153                    Err(err) => {
154                        error!("failed to receive from a channel {err}");
155                        return;
156                    }
157                };
158                bg.process_event(evt).await.log_and_ignore();
159            }
160        });
161
162        Ok(())
163    }
164
165    pub async fn get_last_synced(&self) -> LbResult<i64> {
166        let tx = self.ro_tx().await;
167        let db = tx.db();
168        Ok(db.last_synced.get().copied().unwrap_or(0))
169    }
170
171    pub async fn get_last_synced_human(&self) -> LbResult<String> {
172        let last_synced = self.get_last_synced().await?;
173        Ok(self.get_timestamp_human_string(last_synced))
174    }
175
176    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
177        if timestamp != 0 {
178            Duration::milliseconds(clock::get_time().0 - timestamp)
179                .format_human()
180                .to_string()
181        } else {
182            "never".to_string()
183        }
184    }
185
186    async fn process_event(&self, e: Event) -> LbResult<()> {
187        let current = self.status.current_status.read().await.clone();
188        match e {
189            Event::MetadataChanged(_) | Event::DocumentWritten(_, _) => {
190                self.compute_dirty_locally(current).await?;
191            }
192            Event::Sync(s) => self.update_sync(s, current).await?,
193            _ => {}
194        }
195        Ok(())
196    }
197
198    async fn set_status(&self, status: Status) -> LbResult<()> {
199        *self.status.current_status.write().await = status;
200        self.events.status_updated();
201        Ok(())
202    }
203
204    async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
205        let new = self.local_changes().await;
206        if new != status.dirty_locally {
207            status.dirty_locally = self.local_changes().await;
208            self.set_status(status).await?;
209        }
210        Ok(())
211    }
212
213    async fn spawn_compute_usage(&self) {
214        let mut lock = self.status.space_updated.lock().await;
215        if lock.spawned {
216            return;
217        }
218        let initialized = lock.initialized;
219        lock.spawned = true;
220        lock.initialized = true;
221        let computed = lock.last_computed;
222        drop(lock);
223
224        let bg = self.clone();
225        tokio_spawn!(async move {
226            if initialized && computed.elapsed() < web_time::Duration::from_secs(60) {
227                tokio::time::sleep(web_time::Duration::from_secs(60) - computed.elapsed()).await;
228            }
229            let usage = bg.get_usage().await.log_and_ignore();
230            let mut lock = bg.status.space_updated.lock().await;
231            lock.spawned = false;
232            lock.last_computed = Instant::now();
233            drop(lock);
234
235            bg.status.current_status.write().await.space_used = usage;
236            bg.events.status_updated();
237        });
238    }
239
240    async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
241        match s {
242            SyncIncrement::SyncStarted => {
243                self.reset_in_flight_sync(&mut status);
244                status.syncing = true;
245            }
246            SyncIncrement::PullingDocument(id, in_progress) => {
247                if in_progress {
248                    status.pulling_files.push(id);
249                } else {
250                    status.pulling_files.retain(|fid| id != *fid);
251                }
252            }
253            SyncIncrement::PushingDocument(id, in_progress) => {
254                if in_progress {
255                    status.pushing_files.push(id);
256                } else {
257                    status.pushing_files.retain(|fid| id != *fid);
258                }
259            }
260            SyncIncrement::SyncFinished(maybe_problem) => {
261                // Clear prior sync outcomes here (not at start) so errors persist until a new completed sync
262                self.reset_sync_outcome(&mut status);
263                self.reset_in_flight_sync(&mut status);
264
265                self.spawn_compute_usage().await;
266                status.dirty_locally = self.local_changes().await;
267                if status.dirty_locally.is_empty() {
268                    status.sync_status = self.get_last_synced_human().await.ok();
269                }
270                // @smailbarkouch has requested that this be a Vec<Uuid> instead of a bool
271                // we also could consume the PendingSharesChanged event
272                status.pending_shares = !self.get_pending_shares().await?.is_empty();
273                match maybe_problem {
274                    Some(LbErrKind::ClientUpdateRequired) => {
275                        status.update_required = true;
276                    }
277                    Some(LbErrKind::ServerUnreachable) => {
278                        status.offline = true;
279                    }
280                    Some(LbErrKind::UsageIsOverDataCap) => {
281                        status.out_of_space = true;
282                    }
283                    None => {}
284                    Some(e) => {
285                        status.unexpected_sync_problem =
286                            Some(format!("unexpected error {e:?}: {e}"));
287                        error!("unexpected error {e:?}: {e}");
288                    }
289                }
290            }
291        }
292
293        self.set_status(status).await?;
294
295        Ok(())
296    }
297
298    fn reset_in_flight_sync(&self, status: &mut Status) {
299        status.syncing = false;
300        status.pulling_files.clear();
301        status.pushing_files.clear();
302        status.sync_status = None;
303        status.unexpected_sync_problem = None;
304    }
305
306    fn reset_sync_outcome(&self, status: &mut Status) {
307        status.offline = false;
308        status.update_required = false;
309        status.out_of_space = false;
310    }
311}
312
313impl Default for SpaceUpdater {
314    fn default() -> Self {
315        Self { spawned: false, last_computed: Instant::now(), initialized: false }
316    }
317}