lb_rs/subscribers/
status.rs

1use std::{sync::Arc, time::Duration};
2
3use tokio::{
4    sync::{Mutex, RwLock},
5    time::Instant,
6};
7use uuid::Uuid;
8
9use crate::{
10    model::errors::{LbErrKind, LbResult, Unexpected},
11    service::{events::Event, sync::SyncIncrement, usage::UsageMetrics},
12    Lb,
13};
14
15#[derive(Clone, Default)]
16pub struct StatusUpdater {
17    current_status: Arc<RwLock<Status>>,
18    space_updated: Arc<Mutex<SpaceUpdater>>,
19}
20
21pub struct SpaceUpdater {
22    spawned: bool,
23    last_computed: Instant,
24}
25
26/// lb-rs may be used by multiple disconnected components which may
27/// not be able to seamlessly share state among one another. this struct
28/// provides a snapshot into what overall state of data and tasks are
29/// within lb-rs.
30///
31/// the fields are roughly in order of priority, if your UI has limited
32/// space to represent information (phones?) earlier fields are more
33/// important than later fields. Ideally anything with an ID is represented
34/// in the file tree itself.
35#[derive(Default, Clone)]
36pub struct Status {
37    /// some recent server interaction failed due to network conditions
38    pub offline: bool,
39
40    /// a sync is in progress
41    pub syncing: bool,
42
43    /// at-least one document cannot be pushed due to a data cap
44    pub out_of_space: bool,
45
46    /// there are pending shares
47    pub pending_shares: bool,
48
49    /// you must update to be able to sync, see update_available below
50    pub update_required: bool,
51
52    /// metadata or content for this id is being sent to the server
53    pub pushing_files: Vec<Uuid>,
54
55    /// following files need to be pushed
56    pub dirty_locally: Vec<Uuid>,
57
58    /// metadata or content for this id is being from the server
59    /// callers should be prepared to handle ids they don't know
60    /// about yet.
61    pub pulling_files: Vec<Uuid>,
62
63    /// a mix of human readable and precise data for
64    /// used, and available space
65    pub space_used: Option<UsageMetrics>,
66
67    /// if there is no pending work this will have a human readable
68    /// description of when we last synced successfully
69    pub sync_status: Option<String>,
70}
71
72impl Lb {
73    pub async fn status(&self) -> Status {
74        self.status.current_status.read().await.clone()
75    }
76
77    pub async fn set_initial_state(&self) -> LbResult<()> {
78        if self.keychain.get_account().is_ok() {
79            let mut current = self.status.current_status.write().await;
80            current.dirty_locally = self.local_changes().await;
81            if current.dirty_locally.is_empty() {
82                current.sync_status = self.get_last_synced_human().await.log_and_ignore();
83            }
84            current.pending_shares = !self.get_pending_shares().await?.is_empty();
85        }
86
87        Ok(())
88    }
89
90    pub async fn setup_status(&self) -> LbResult<()> {
91        self.set_initial_state().await?;
92        let mut rx = self.subscribe();
93        let bg = self.clone();
94
95        tokio::spawn(async move {
96            loop {
97                let evt = match rx.recv().await {
98                    Ok(evt) => evt,
99                    Err(err) => {
100                        error!("failed to receive from a channel {err}");
101                        return;
102                    }
103                };
104                bg.process_event(evt).await.log_and_ignore();
105            }
106        });
107
108        Ok(())
109    }
110
111    async fn process_event(&self, e: Event) -> LbResult<()> {
112        let mut current = self.status.current_status.read().await.clone();
113        match e {
114            Event::MetadataChanged | Event::DocumentWritten(_) => {
115                self.compute_dirty_locally(&mut current).await?;
116            }
117            Event::Sync(s) => self.update_sync(s, &mut current).await?,
118            _ => {}
119        }
120        Ok(())
121    }
122
123    async fn compute_dirty_locally(&self, status: &mut Status) -> LbResult<()> {
124        let new = self.local_changes().await;
125        if new != status.dirty_locally {
126            status.dirty_locally = self.local_changes().await;
127            self.events.status_updated();
128        }
129        Ok(())
130    }
131
132    async fn compute_usage(&self) {
133        let mut lock = self.status.space_updated.lock().await;
134        if lock.spawned {
135            return;
136        }
137        lock.spawned = true;
138        let computed = lock.last_computed;
139        drop(lock);
140
141        let bg = self.clone();
142        tokio::spawn(async move {
143            if computed.elapsed() < Duration::from_secs(60) {
144                tokio::time::sleep(Duration::from_secs(60) - computed.elapsed()).await;
145            }
146            let usage = bg.get_usage().await.log_and_ignore();
147            let mut lock = bg.status.space_updated.lock().await;
148            lock.spawned = false;
149            lock.last_computed = Instant::now();
150            drop(lock);
151
152            bg.status.current_status.write().await.space_used = usage;
153            bg.events.status_updated();
154        });
155    }
156
157    async fn update_sync(&self, s: SyncIncrement, status: &mut Status) -> LbResult<()> {
158        match s {
159            SyncIncrement::SyncStarted => {
160                self.reset_sync(status);
161                status.syncing = true;
162            }
163            SyncIncrement::PullingDocument(id, in_progress) => {
164                if in_progress {
165                    status.pulling_files.push(id);
166                } else {
167                    status.pulling_files.retain(|fid| id != *fid);
168                }
169            }
170            SyncIncrement::PushingDocument(id, in_progress) => {
171                if in_progress {
172                    status.pushing_files.push(id);
173                } else {
174                    status.pushing_files.retain(|fid| id != *fid);
175                }
176            }
177            SyncIncrement::SyncFinished(maybe_problem) => {
178                self.reset_sync(status);
179                self.compute_usage().await;
180                match maybe_problem {
181                    Some(LbErrKind::ClientUpdateRequired) => {
182                        status.update_required = true;
183                    }
184                    Some(LbErrKind::ServerUnreachable) => {
185                        status.offline = true;
186                    }
187                    Some(LbErrKind::UsageIsOverDataCap) => {
188                        status.out_of_space = true;
189                    }
190                    None => {
191                        status.dirty_locally = self.local_changes().await;
192                        if status.dirty_locally.is_empty() {
193                            status.sync_status = self.get_last_synced_human().await.ok();
194                        }
195                        status.pending_shares = !self.get_pending_shares().await?.is_empty();
196                    }
197                    _ => {
198                        error!("unexpected sync problem found {maybe_problem:?}");
199                    }
200                }
201            }
202        }
203
204        self.events.status_updated();
205
206        Ok(())
207    }
208
209    fn reset_sync(&self, status: &mut Status) {
210        status.syncing = false;
211        status.pulling_files.clear();
212        status.pushing_files.clear();
213        status.offline = false;
214        status.update_required = false;
215        status.out_of_space = false;
216        status.sync_status = None;
217    }
218}
219
220impl Default for SpaceUpdater {
221    fn default() -> Self {
222        Self { spawned: false, last_computed: Instant::now() }
223    }
224}