lb_rs/subscribers/
status.rs

1use std::{sync::Arc, time::Duration};
2
3use tokio::{
4    sync::{Mutex, RwLock},
5    time::Instant,
6};
7use uuid::Uuid;
8
9use crate::{
10    model::errors::{LbErrKind, LbResult, Unexpected},
11    service::{events::Event, sync::SyncIncrement, usage::UsageMetrics},
12    Lb,
13};
14
15#[derive(Clone, Default)]
16pub struct StatusUpdater {
17    current_status: Arc<RwLock<Status>>,
18    space_updated: Arc<Mutex<SpaceUpdater>>,
19}
20
21/// rate limit get_usage calls to once every 60 seconds or so
22pub struct SpaceUpdater {
23    initialized: bool,
24    spawned: bool,
25    last_computed: Instant,
26}
27
28/// lb-rs may be used by multiple disconnected components which may
29/// not be able to seamlessly share state among one another. this struct
30/// provides a snapshot into what overall state of data and tasks are
31/// within lb-rs.
32///
33/// the fields are roughly in order of priority, if your UI has limited
34/// space to represent information (phones?) earlier fields are more
35/// important than later fields. Ideally anything with an ID is represented
36/// in the file tree itself.
37#[derive(Default, Clone, Debug)]
38pub struct Status {
39    /// some recent server interaction failed due to network conditions
40    pub offline: bool,
41
42    /// a sync is in progress
43    pub syncing: bool,
44
45    /// at-least one document cannot be pushed due to a data cap
46    pub out_of_space: bool,
47
48    /// there are pending shares
49    pub pending_shares: bool,
50
51    /// you must update to be able to sync, see update_available below
52    pub update_required: bool,
53
54    /// metadata or content for this id is being sent to the server
55    pub pushing_files: Vec<Uuid>,
56
57    /// following files need to be pushed
58    pub dirty_locally: Vec<Uuid>,
59
60    /// metadata or content for this id is being from the server
61    /// callers should be prepared to handle ids they don't know
62    /// about yet.
63    pub pulling_files: Vec<Uuid>,
64
65    /// a mix of human readable and precise data for
66    /// used, and available space
67    pub space_used: Option<UsageMetrics>,
68
69    /// if there is no pending work this will have a human readable
70    /// description of when we last synced successfully
71    pub sync_status: Option<String>,
72
73    pub unexpected_sync_problem: Option<String>,
74}
75
76impl Status {
77    pub fn msg(&self) -> Option<String> {
78        if self.syncing {
79            return Some("Syncing...".to_string());
80        }
81
82        if self.offline {
83            if !self.dirty_locally.is_empty() {
84                let len = self.dirty_locally.len();
85                return Some(format!(
86                    "Offline, {} change{} unsynced.",
87                    len,
88                    if len > 1 { "s" } else { "" }
89                ));
90            }
91
92            if let Some(last_synced) = &self.sync_status {
93                return Some(format!("Offline, last synced: {}", last_synced));
94            }
95
96            return Some("Offline.".to_string());
97        }
98
99        if self.out_of_space {
100            return Some("You're out of space!".to_string());
101        }
102
103        if self.update_required {
104            return Some("An update is required to continue.".to_string());
105        }
106
107        if let Some(err) = &self.unexpected_sync_problem {
108            return Some(err.to_string());
109        }
110
111        if !self.dirty_locally.is_empty() {
112            let dirty_locally = self.dirty_locally.len();
113            return Some(format!("{dirty_locally} changes unsynced"));
114        }
115
116        if let Some(last_synced) = &self.sync_status {
117            return Some(format!("Last synced: {}", last_synced));
118        }
119
120        None
121    }
122}
123
124impl Lb {
125    pub async fn status(&self) -> Status {
126        self.status.current_status.read().await.clone()
127    }
128
129    pub async fn set_initial_state(&self) -> LbResult<()> {
130        if self.keychain.get_account().is_ok() {
131            self.spawn_compute_usage().await;
132            let mut current = self.status.current_status.write().await;
133            current.dirty_locally = self.local_changes().await;
134            if current.dirty_locally.is_empty() {
135                current.sync_status = self.get_last_synced_human().await.log_and_ignore();
136            }
137            current.pending_shares = !self.get_pending_shares().await?.is_empty();
138        }
139
140        Ok(())
141    }
142
143    pub async fn setup_status(&self) -> LbResult<()> {
144        self.set_initial_state().await?;
145        let mut rx = self.subscribe();
146        let bg = self.clone();
147
148        tokio::spawn(async move {
149            loop {
150                let evt = match rx.recv().await {
151                    Ok(evt) => evt,
152                    Err(err) => {
153                        error!("failed to receive from a channel {err}");
154                        return;
155                    }
156                };
157                bg.process_event(evt).await.log_and_ignore();
158            }
159        });
160
161        Ok(())
162    }
163
164    async fn process_event(&self, e: Event) -> LbResult<()> {
165        let current = self.status.current_status.read().await.clone();
166        match e {
167            Event::MetadataChanged | Event::DocumentWritten(_, _) => {
168                self.compute_dirty_locally(current).await?;
169            }
170            Event::Sync(s) => self.update_sync(s, current).await?,
171            _ => {}
172        }
173        Ok(())
174    }
175
176    async fn set_status(&self, status: Status) -> LbResult<()> {
177        *self.status.current_status.write().await = status;
178        self.events.status_updated();
179        Ok(())
180    }
181
182    async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
183        let new = self.local_changes().await;
184        if new != status.dirty_locally {
185            status.dirty_locally = self.local_changes().await;
186            self.set_status(status).await?;
187        }
188        Ok(())
189    }
190
191    async fn spawn_compute_usage(&self) {
192        let mut lock = self.status.space_updated.lock().await;
193        if lock.spawned {
194            return;
195        }
196        let initialized = lock.initialized;
197        lock.spawned = true;
198        lock.initialized = true;
199        let computed = lock.last_computed;
200        drop(lock);
201
202        let bg = self.clone();
203        tokio::spawn(async move {
204            if initialized && computed.elapsed() < Duration::from_secs(60) {
205                tokio::time::sleep(Duration::from_secs(60) - computed.elapsed()).await;
206            }
207            let usage = bg.get_usage().await.log_and_ignore();
208            let mut lock = bg.status.space_updated.lock().await;
209            lock.spawned = false;
210            lock.last_computed = Instant::now();
211            drop(lock);
212
213            bg.status.current_status.write().await.space_used = usage;
214            bg.events.status_updated();
215        });
216    }
217
218    async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
219        match s {
220            SyncIncrement::SyncStarted => {
221                self.reset_sync(&mut status);
222                status.syncing = true;
223            }
224            SyncIncrement::PullingDocument(id, in_progress) => {
225                if in_progress {
226                    status.pulling_files.push(id);
227                } else {
228                    status.pulling_files.retain(|fid| id != *fid);
229                }
230            }
231            SyncIncrement::PushingDocument(id, in_progress) => {
232                if in_progress {
233                    status.pushing_files.push(id);
234                } else {
235                    status.pushing_files.retain(|fid| id != *fid);
236                }
237            }
238            SyncIncrement::SyncFinished(maybe_problem) => {
239                self.reset_sync(&mut status);
240                self.spawn_compute_usage().await;
241                status.dirty_locally = self.local_changes().await;
242                if status.dirty_locally.is_empty() {
243                    status.sync_status = self.get_last_synced_human().await.ok();
244                }
245                status.pending_shares = !self.get_pending_shares().await?.is_empty();
246                match maybe_problem {
247                    Some(LbErrKind::ClientUpdateRequired) => {
248                        status.update_required = true;
249                    }
250                    Some(LbErrKind::ServerUnreachable) => {
251                        status.offline = true;
252                    }
253                    Some(LbErrKind::UsageIsOverDataCap) => {
254                        status.out_of_space = true;
255                    }
256                    None => {}
257                    Some(e) => {
258                        status.unexpected_sync_problem =
259                            Some(format!("unexpected error {e:?}: {e}"));
260                        error!("unexpected error {e:?}: {e}");
261                    }
262                }
263            }
264        }
265
266        self.set_status(status).await?;
267
268        Ok(())
269    }
270
271    fn reset_sync(&self, status: &mut Status) {
272        status.syncing = false;
273        status.pulling_files.clear();
274        status.pushing_files.clear();
275        status.offline = false;
276        status.update_required = false;
277        status.out_of_space = false;
278        status.sync_status = None;
279        status.unexpected_sync_problem = None;
280    }
281}
282
283impl Default for SpaceUpdater {
284    fn default() -> Self {
285        Self { spawned: false, last_computed: Instant::now(), initialized: false }
286    }
287}