Skip to main content

lb_rs/subscribers/
status.rs

1use basic_human_duration::ChronoHumanDuration;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use time::Duration;
5
6use tokio::sync::{Mutex, RwLock};
7use uuid::Uuid;
8use web_time::Instant;
9
10use crate::model::clock;
11use crate::model::errors::{LbErrKind, LbResult, Unexpected};
12use crate::service::events::{Event, SyncIncrement};
13use crate::service::usage::UsageMetrics;
14use crate::{Lb, tokio_spawn};
15
16#[derive(Clone, Default)]
17pub struct StatusUpdater {
18    current_status: Arc<RwLock<Status>>,
19    space_updated: Arc<Mutex<SpaceUpdater>>,
20}
21
22/// rate limit get_usage calls to once every 60 seconds or so
23pub struct SpaceUpdater {
24    initialized: bool,
25    spawned: bool,
26    last_computed: Instant,
27}
28
29/// lb-rs may be used by multiple disconnected components which may
30/// not be able to seamlessly share state among one another. this struct
31/// provides a snapshot into what overall state of data and tasks are
32/// within lb-rs.
33///
34/// the fields are roughly in order of priority, if your UI has limited
35/// space to represent information (phones?) earlier fields are more
36/// important than later fields. Ideally anything with an ID is represented
37/// in the file tree itself.
38#[derive(Default, Clone, Debug, Serialize, Deserialize)]
39pub struct Status {
40    /// some recent server interaction failed due to network conditions
41    pub offline: bool,
42
43    /// a sync is in progress
44    pub syncing: bool,
45
46    /// at-least one document cannot be pushed due to a data cap
47    pub out_of_space: bool,
48
49    /// there are pending shares
50    pub pending_shares: bool,
51
52    /// you must update to be able to sync, see update_available below
53    pub update_required: bool,
54
55    /// metadata or content for this id is being sent to the server
56    pub pushing_files: Vec<Uuid>,
57
58    /// following files need to be pushed
59    pub dirty_locally: Vec<Uuid>,
60
61    /// metadata or content for this id is being from the server
62    /// callers should be prepared to handle ids they don't know
63    /// about yet.
64    pub pulling_files: Vec<Uuid>,
65
66    /// a mix of human readable and precise data for
67    /// used, and available space
68    pub space_used: Option<UsageMetrics>,
69
70    /// if there is no pending work this will have a human readable
71    /// description of when we last synced successfully
72    pub sync_status: Option<String>,
73
74    pub unexpected_sync_problem: Option<String>,
75}
76
77impl Status {
78    pub fn msg(&self) -> Option<String> {
79        if self.syncing {
80            return Some("Syncing...".to_string());
81        }
82
83        if self.offline {
84            if !self.dirty_locally.is_empty() {
85                let len = self.dirty_locally.len();
86                return Some(format!(
87                    "Offline, {} change{} unsynced.",
88                    len,
89                    if len > 1 { "s" } else { "" }
90                ));
91            }
92
93            if let Some(last_synced) = &self.sync_status {
94                return Some(format!("Offline, last synced: {last_synced}"));
95            }
96
97            return Some("Offline.".to_string());
98        }
99
100        if self.out_of_space {
101            return Some("You're out of space!".to_string());
102        }
103
104        if self.update_required {
105            return Some("An update is required to continue.".to_string());
106        }
107
108        if let Some(err) = &self.unexpected_sync_problem {
109            return Some(err.to_string());
110        }
111
112        if !self.dirty_locally.is_empty() {
113            let dirty_locally = self.dirty_locally.len();
114            return Some(format!("{dirty_locally} changes unsynced"));
115        }
116
117        if let Some(last_synced) = &self.sync_status {
118            return Some(format!("Last synced: {last_synced}"));
119        }
120
121        None
122    }
123}
124
125impl Lb {
126    pub async fn status(&self) -> Status {
127        self.status.current_status.read().await.clone()
128    }
129
130    pub async fn set_initial_state(&self) -> LbResult<()> {
131        if self.keychain.get_account().is_ok() {
132            self.spawn_compute_usage().await;
133            let mut current = self.status.current_status.write().await;
134            current.dirty_locally = self.local_changes().await;
135            if current.dirty_locally.is_empty() {
136                current.sync_status = self.get_last_synced_human().await.log_and_ignore();
137            }
138            current.pending_shares = !self.get_pending_shares().await?.is_empty();
139        }
140
141        Ok(())
142    }
143
144    pub async fn setup_status(&self) -> LbResult<()> {
145        self.set_initial_state().await?;
146        let mut rx = self.subscribe();
147        let bg = self.clone();
148
149        tokio_spawn!(async move {
150            loop {
151                let evt = match rx.recv().await {
152                    Ok(evt) => evt,
153                    Err(err) => {
154                        error!("failed to receive from a channel {err}");
155                        return;
156                    }
157                };
158                bg.process_event(evt).await.log_and_ignore();
159            }
160        });
161
162        Ok(())
163    }
164
165    pub async fn get_last_synced_human(&self) -> LbResult<String> {
166        let tx = self.ro_tx().await;
167        let db = tx.db();
168        let last_synced = db.last_synced.get().copied().unwrap_or(0);
169
170        Ok(self.get_timestamp_human_string(last_synced))
171    }
172
173    pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
174        if timestamp != 0 {
175            Duration::milliseconds(clock::get_time().0 - timestamp)
176                .format_human()
177                .to_string()
178        } else {
179            "never".to_string()
180        }
181    }
182
183    async fn process_event(&self, e: Event) -> LbResult<()> {
184        let current = self.status.current_status.read().await.clone();
185        match e {
186            Event::MetadataChanged(_) | Event::DocumentWritten(_, _) => {
187                self.compute_dirty_locally(current).await?;
188            }
189            Event::Sync(s) => self.update_sync(s, current).await?,
190            _ => {}
191        }
192        Ok(())
193    }
194
195    async fn set_status(&self, status: Status) -> LbResult<()> {
196        *self.status.current_status.write().await = status;
197        self.events.status_updated();
198        Ok(())
199    }
200
201    async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
202        let new = self.local_changes().await;
203        if new != status.dirty_locally {
204            status.dirty_locally = self.local_changes().await;
205            self.set_status(status).await?;
206        }
207        Ok(())
208    }
209
210    async fn spawn_compute_usage(&self) {
211        let mut lock = self.status.space_updated.lock().await;
212        if lock.spawned {
213            return;
214        }
215        let initialized = lock.initialized;
216        lock.spawned = true;
217        lock.initialized = true;
218        let computed = lock.last_computed;
219        drop(lock);
220
221        let bg = self.clone();
222        tokio_spawn!(async move {
223            if initialized && computed.elapsed() < web_time::Duration::from_secs(60) {
224                tokio::time::sleep(web_time::Duration::from_secs(60) - computed.elapsed()).await;
225            }
226            let usage = bg.get_usage().await.log_and_ignore();
227            let mut lock = bg.status.space_updated.lock().await;
228            lock.spawned = false;
229            lock.last_computed = Instant::now();
230            drop(lock);
231
232            bg.status.current_status.write().await.space_used = usage;
233            bg.events.status_updated();
234        });
235    }
236
237    async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
238        match s {
239            SyncIncrement::SyncStarted => {
240                self.reset_in_flight_sync(&mut status);
241                status.syncing = true;
242            }
243            SyncIncrement::PullingDocument(id, in_progress) => {
244                if in_progress {
245                    status.pulling_files.push(id);
246                } else {
247                    status.pulling_files.retain(|fid| id != *fid);
248                }
249            }
250            SyncIncrement::PushingDocument(id, in_progress) => {
251                if in_progress {
252                    status.pushing_files.push(id);
253                } else {
254                    status.pushing_files.retain(|fid| id != *fid);
255                }
256            }
257            SyncIncrement::SyncFinished(maybe_problem) => {
258                // Clear prior sync outcomes here (not at start) so errors persist until a new completed sync
259                self.reset_sync_outcome(&mut status);
260                self.reset_in_flight_sync(&mut status);
261
262                self.spawn_compute_usage().await;
263                status.dirty_locally = self.local_changes().await;
264                if status.dirty_locally.is_empty() {
265                    status.sync_status = self.get_last_synced_human().await.ok();
266                }
267                // @smailbarkouch has requested that this be a Vec<Uuid> instead of a bool
268                // we also could consume the PendingSharesChanged event
269                status.pending_shares = !self.get_pending_shares().await?.is_empty();
270                match maybe_problem {
271                    Some(LbErrKind::ClientUpdateRequired) => {
272                        status.update_required = true;
273                    }
274                    Some(LbErrKind::ServerUnreachable) => {
275                        status.offline = true;
276                    }
277                    Some(LbErrKind::UsageIsOverDataCap) => {
278                        status.out_of_space = true;
279                    }
280                    None => {}
281                    Some(e) => {
282                        status.unexpected_sync_problem =
283                            Some(format!("unexpected error {e:?}: {e}"));
284                        error!("unexpected error {e:?}: {e}");
285                    }
286                }
287            }
288        }
289
290        self.set_status(status).await?;
291
292        Ok(())
293    }
294
295    fn reset_in_flight_sync(&self, status: &mut Status) {
296        status.syncing = false;
297        status.pulling_files.clear();
298        status.pushing_files.clear();
299        status.sync_status = None;
300        status.unexpected_sync_problem = None;
301    }
302
303    fn reset_sync_outcome(&self, status: &mut Status) {
304        status.offline = false;
305        status.update_required = false;
306        status.out_of_space = false;
307    }
308}
309
310impl Default for SpaceUpdater {
311    fn default() -> Self {
312        Self { spawned: false, last_computed: Instant::now(), initialized: false }
313    }
314}