lb_rs/subscribers/
status.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::{Mutex, RwLock};
5use tokio::time::Instant;
6use uuid::Uuid;
7
8use crate::Lb;
9use crate::model::errors::{LbErrKind, LbResult, Unexpected};
10use crate::service::events::Event;
11use crate::service::sync::SyncIncrement;
12use crate::service::usage::UsageMetrics;
13
14#[derive(Clone, Default)]
15pub struct StatusUpdater {
16    current_status: Arc<RwLock<Status>>,
17    space_updated: Arc<Mutex<SpaceUpdater>>,
18}
19
20/// rate limit get_usage calls to once every 60 seconds or so
21pub struct SpaceUpdater {
22    initialized: bool,
23    spawned: bool,
24    last_computed: Instant,
25}
26
27/// lb-rs may be used by multiple disconnected components which may
28/// not be able to seamlessly share state among one another. this struct
29/// provides a snapshot into what overall state of data and tasks are
30/// within lb-rs.
31///
32/// the fields are roughly in order of priority, if your UI has limited
33/// space to represent information (phones?) earlier fields are more
34/// important than later fields. Ideally anything with an ID is represented
35/// in the file tree itself.
36#[derive(Default, Clone, Debug)]
37pub struct Status {
38    /// some recent server interaction failed due to network conditions
39    pub offline: bool,
40
41    /// a sync is in progress
42    pub syncing: bool,
43
44    /// at-least one document cannot be pushed due to a data cap
45    pub out_of_space: bool,
46
47    /// there are pending shares
48    pub pending_shares: bool,
49
50    /// you must update to be able to sync, see update_available below
51    pub update_required: bool,
52
53    /// metadata or content for this id is being sent to the server
54    pub pushing_files: Vec<Uuid>,
55
56    /// following files need to be pushed
57    pub dirty_locally: Vec<Uuid>,
58
59    /// metadata or content for this id is being from the server
60    /// callers should be prepared to handle ids they don't know
61    /// about yet.
62    pub pulling_files: Vec<Uuid>,
63
64    /// a mix of human readable and precise data for
65    /// used, and available space
66    pub space_used: Option<UsageMetrics>,
67
68    /// if there is no pending work this will have a human readable
69    /// description of when we last synced successfully
70    pub sync_status: Option<String>,
71
72    pub unexpected_sync_problem: Option<String>,
73}
74
75impl Status {
76    pub fn msg(&self) -> Option<String> {
77        if self.syncing {
78            return Some("Syncing...".to_string());
79        }
80
81        if self.offline {
82            if !self.dirty_locally.is_empty() {
83                let len = self.dirty_locally.len();
84                return Some(format!(
85                    "Offline, {} change{} unsynced.",
86                    len,
87                    if len > 1 { "s" } else { "" }
88                ));
89            }
90
91            if let Some(last_synced) = &self.sync_status {
92                return Some(format!("Offline, last synced: {last_synced}"));
93            }
94
95            return Some("Offline.".to_string());
96        }
97
98        if self.out_of_space {
99            return Some("You're out of space!".to_string());
100        }
101
102        if self.update_required {
103            return Some("An update is required to continue.".to_string());
104        }
105
106        if let Some(err) = &self.unexpected_sync_problem {
107            return Some(err.to_string());
108        }
109
110        if !self.dirty_locally.is_empty() {
111            let dirty_locally = self.dirty_locally.len();
112            return Some(format!("{dirty_locally} changes unsynced"));
113        }
114
115        if let Some(last_synced) = &self.sync_status {
116            return Some(format!("Last synced: {last_synced}"));
117        }
118
119        None
120    }
121}
122
123impl Lb {
124    pub async fn status(&self) -> Status {
125        self.status.current_status.read().await.clone()
126    }
127
128    pub async fn set_initial_state(&self) -> LbResult<()> {
129        if self.keychain.get_account().is_ok() {
130            self.spawn_compute_usage().await;
131            let mut current = self.status.current_status.write().await;
132            current.dirty_locally = self.local_changes().await;
133            if current.dirty_locally.is_empty() {
134                current.sync_status = self.get_last_synced_human().await.log_and_ignore();
135            }
136            current.pending_shares = !self.get_pending_shares().await?.is_empty();
137        }
138
139        Ok(())
140    }
141
142    pub async fn setup_status(&self) -> LbResult<()> {
143        self.set_initial_state().await?;
144        let mut rx = self.subscribe();
145        let bg = self.clone();
146
147        tokio::spawn(async move {
148            loop {
149                let evt = match rx.recv().await {
150                    Ok(evt) => evt,
151                    Err(err) => {
152                        error!("failed to receive from a channel {err}");
153                        return;
154                    }
155                };
156                bg.process_event(evt).await.log_and_ignore();
157            }
158        });
159
160        Ok(())
161    }
162
163    async fn process_event(&self, e: Event) -> LbResult<()> {
164        let current = self.status.current_status.read().await.clone();
165        match e {
166            Event::MetadataChanged | Event::DocumentWritten(_, _) => {
167                self.compute_dirty_locally(current).await?;
168            }
169            Event::Sync(s) => self.update_sync(s, current).await?,
170            _ => {}
171        }
172        Ok(())
173    }
174
175    async fn set_status(&self, status: Status) -> LbResult<()> {
176        *self.status.current_status.write().await = status;
177        self.events.status_updated();
178        Ok(())
179    }
180
181    async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
182        let new = self.local_changes().await;
183        if new != status.dirty_locally {
184            status.dirty_locally = self.local_changes().await;
185            self.set_status(status).await?;
186        }
187        Ok(())
188    }
189
190    async fn spawn_compute_usage(&self) {
191        let mut lock = self.status.space_updated.lock().await;
192        if lock.spawned {
193            return;
194        }
195        let initialized = lock.initialized;
196        lock.spawned = true;
197        lock.initialized = true;
198        let computed = lock.last_computed;
199        drop(lock);
200
201        let bg = self.clone();
202        tokio::spawn(async move {
203            if initialized && computed.elapsed() < Duration::from_secs(60) {
204                tokio::time::sleep(Duration::from_secs(60) - computed.elapsed()).await;
205            }
206            let usage = bg.get_usage().await.log_and_ignore();
207            let mut lock = bg.status.space_updated.lock().await;
208            lock.spawned = false;
209            lock.last_computed = Instant::now();
210            drop(lock);
211
212            bg.status.current_status.write().await.space_used = usage;
213            bg.events.status_updated();
214        });
215    }
216
217    async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
218        match s {
219            SyncIncrement::SyncStarted => {
220                self.reset_sync(&mut status);
221                status.syncing = true;
222            }
223            SyncIncrement::PullingDocument(id, in_progress) => {
224                if in_progress {
225                    status.pulling_files.push(id);
226                } else {
227                    status.pulling_files.retain(|fid| id != *fid);
228                }
229            }
230            SyncIncrement::PushingDocument(id, in_progress) => {
231                if in_progress {
232                    status.pushing_files.push(id);
233                } else {
234                    status.pushing_files.retain(|fid| id != *fid);
235                }
236            }
237            SyncIncrement::SyncFinished(maybe_problem) => {
238                self.reset_sync(&mut status);
239                self.spawn_compute_usage().await;
240                status.dirty_locally = self.local_changes().await;
241                if status.dirty_locally.is_empty() {
242                    status.sync_status = self.get_last_synced_human().await.ok();
243                }
244                // @smailbarkouch has requested that this be a Vec<Uuid> instead of a bool
245                // we also could consume the PendingSharesChanged event
246                status.pending_shares = !self.get_pending_shares().await?.is_empty();
247                match maybe_problem {
248                    Some(LbErrKind::ClientUpdateRequired) => {
249                        status.update_required = true;
250                    }
251                    Some(LbErrKind::ServerUnreachable) => {
252                        status.offline = true;
253                    }
254                    Some(LbErrKind::UsageIsOverDataCap) => {
255                        status.out_of_space = true;
256                    }
257                    None => {}
258                    Some(e) => {
259                        status.unexpected_sync_problem =
260                            Some(format!("unexpected error {e:?}: {e}"));
261                        error!("unexpected error {e:?}: {e}");
262                    }
263                }
264            }
265        }
266
267        self.set_status(status).await?;
268
269        Ok(())
270    }
271
272    fn reset_sync(&self, status: &mut Status) {
273        status.syncing = false;
274        status.pulling_files.clear();
275        status.pushing_files.clear();
276        status.offline = false;
277        status.update_required = false;
278        status.out_of_space = false;
279        status.sync_status = None;
280        status.unexpected_sync_problem = None;
281    }
282}
283
284impl Default for SpaceUpdater {
285    fn default() -> Self {
286        Self { spawned: false, last_computed: Instant::now(), initialized: false }
287    }
288}