lb_rs/subscribers/
status.rs1use basic_human_duration::ChronoHumanDuration;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use time::Duration;
5
6use tokio::sync::{Mutex, RwLock};
7use uuid::Uuid;
8use web_time::Instant;
9
10use crate::model::clock;
11use crate::model::errors::{LbErrKind, LbResult, Unexpected};
12use crate::service::events::{Event, SyncIncrement};
13use crate::service::usage::UsageMetrics;
14use crate::{LocalLb, tokio_spawn};
15
16#[derive(Clone, Default)]
17pub struct StatusUpdater {
18 current_status: Arc<RwLock<Status>>,
19 space_updated: Arc<Mutex<SpaceUpdater>>,
20}
21
22pub struct SpaceUpdater {
24 initialized: bool,
25 spawned: bool,
26 last_computed: Instant,
27}
28
29#[derive(Default, Clone, Debug, Serialize, Deserialize)]
39pub struct Status {
40 pub offline: bool,
42
43 pub syncing: bool,
45
46 pub out_of_space: bool,
48
49 pub pending_shares: bool,
51
52 pub update_required: bool,
54
55 pub pushing_files: Vec<Uuid>,
57
58 pub dirty_locally: Vec<Uuid>,
60
61 pub pulling_files: Vec<Uuid>,
65
66 pub space_used: Option<UsageMetrics>,
69
70 pub sync_status: Option<String>,
73
74 pub unexpected_sync_problem: Option<String>,
75}
76
77impl Status {
78 pub fn msg(&self) -> Option<String> {
79 if self.syncing {
80 return Some("Syncing...".to_string());
81 }
82
83 if self.offline {
84 if !self.dirty_locally.is_empty() {
85 let len = self.dirty_locally.len();
86 return Some(format!(
87 "Offline, {} change{} unsynced.",
88 len,
89 if len > 1 { "s" } else { "" }
90 ));
91 }
92
93 if let Some(last_synced) = &self.sync_status {
94 return Some(format!("Offline, last synced: {last_synced}"));
95 }
96
97 return Some("Offline.".to_string());
98 }
99
100 if self.out_of_space {
101 return Some("You're out of space!".to_string());
102 }
103
104 if self.update_required {
105 return Some("An update is required to continue.".to_string());
106 }
107
108 if let Some(err) = &self.unexpected_sync_problem {
109 return Some(err.to_string());
110 }
111
112 if !self.dirty_locally.is_empty() {
113 let dirty_locally = self.dirty_locally.len();
114 return Some(format!("{dirty_locally} changes unsynced"));
115 }
116
117 if let Some(last_synced) = &self.sync_status {
118 return Some(format!("Last synced: {last_synced}"));
119 }
120
121 None
122 }
123}
124
125impl LocalLb {
126 pub async fn status(&self) -> Status {
127 self.status.current_status.read().await.clone()
128 }
129
130 pub async fn set_initial_state(&self) -> LbResult<()> {
131 if self.keychain.get_account().is_ok() {
132 self.spawn_compute_usage().await;
133 let mut current = self.status.current_status.write().await;
134 current.dirty_locally = self.local_changes().await;
135 if current.dirty_locally.is_empty() {
136 current.sync_status = self.get_last_synced_human().await.log_and_ignore();
137 }
138 current.pending_shares = !self.get_pending_shares().await?.is_empty();
139 }
140
141 Ok(())
142 }
143
144 pub async fn setup_status(&self) -> LbResult<()> {
145 self.set_initial_state().await?;
146 let mut rx = self.subscribe();
147 let bg = self.clone();
148
149 tokio_spawn!(async move {
150 loop {
151 let evt = match rx.recv().await {
152 Ok(evt) => evt,
153 Err(err) => {
154 error!("failed to receive from a channel {err}");
155 return;
156 }
157 };
158 bg.process_event(evt).await.log_and_ignore();
159 }
160 });
161
162 Ok(())
163 }
164
165 pub async fn get_last_synced(&self) -> LbResult<i64> {
166 let tx = self.ro_tx().await;
167 let db = tx.db();
168 Ok(db.last_synced.get().copied().unwrap_or(0))
169 }
170
171 pub async fn get_last_synced_human(&self) -> LbResult<String> {
172 let last_synced = self.get_last_synced().await?;
173 Ok(self.get_timestamp_human_string(last_synced))
174 }
175
176 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
177 if timestamp != 0 {
178 Duration::milliseconds(clock::get_time().0 - timestamp)
179 .format_human()
180 .to_string()
181 } else {
182 "never".to_string()
183 }
184 }
185
186 async fn process_event(&self, e: Event) -> LbResult<()> {
187 let current = self.status.current_status.read().await.clone();
188 match e {
189 Event::MetadataChanged(_) | Event::DocumentWritten(_, _) => {
190 self.compute_dirty_locally(current).await?;
191 }
192 Event::Sync(s) => self.update_sync(s, current).await?,
193 _ => {}
194 }
195 Ok(())
196 }
197
198 async fn set_status(&self, status: Status) -> LbResult<()> {
199 *self.status.current_status.write().await = status;
200 self.events.status_updated();
201 Ok(())
202 }
203
204 async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
205 let new = self.local_changes().await;
206 if new != status.dirty_locally {
207 status.dirty_locally = self.local_changes().await;
208 self.set_status(status).await?;
209 }
210 Ok(())
211 }
212
213 async fn spawn_compute_usage(&self) {
214 let mut lock = self.status.space_updated.lock().await;
215 if lock.spawned {
216 return;
217 }
218 let initialized = lock.initialized;
219 lock.spawned = true;
220 lock.initialized = true;
221 let computed = lock.last_computed;
222 drop(lock);
223
224 let bg = self.clone();
225 tokio_spawn!(async move {
226 if initialized && computed.elapsed() < web_time::Duration::from_secs(60) {
227 tokio::time::sleep(web_time::Duration::from_secs(60) - computed.elapsed()).await;
228 }
229 let usage = bg.get_usage().await.log_and_ignore();
230 let mut lock = bg.status.space_updated.lock().await;
231 lock.spawned = false;
232 lock.last_computed = Instant::now();
233 drop(lock);
234
235 bg.status.current_status.write().await.space_used = usage;
236 bg.events.status_updated();
237 });
238 }
239
240 async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
241 match s {
242 SyncIncrement::SyncStarted => {
243 self.reset_in_flight_sync(&mut status);
244 status.syncing = true;
245 }
246 SyncIncrement::PullingDocument(id, in_progress) => {
247 if in_progress {
248 status.pulling_files.push(id);
249 } else {
250 status.pulling_files.retain(|fid| id != *fid);
251 }
252 }
253 SyncIncrement::PushingDocument(id, in_progress) => {
254 if in_progress {
255 status.pushing_files.push(id);
256 } else {
257 status.pushing_files.retain(|fid| id != *fid);
258 }
259 }
260 SyncIncrement::SyncFinished(maybe_problem) => {
261 self.reset_sync_outcome(&mut status);
263 self.reset_in_flight_sync(&mut status);
264
265 self.spawn_compute_usage().await;
266 status.dirty_locally = self.local_changes().await;
267 if status.dirty_locally.is_empty() {
268 status.sync_status = self.get_last_synced_human().await.ok();
269 }
270 status.pending_shares = !self.get_pending_shares().await?.is_empty();
273 match maybe_problem {
274 Some(LbErrKind::ClientUpdateRequired) => {
275 status.update_required = true;
276 }
277 Some(LbErrKind::ServerUnreachable) => {
278 status.offline = true;
279 }
280 Some(LbErrKind::UsageIsOverDataCap) => {
281 status.out_of_space = true;
282 }
283 None => {}
284 Some(e) => {
285 status.unexpected_sync_problem =
286 Some(format!("unexpected error {e:?}: {e}"));
287 error!("unexpected error {e:?}: {e}");
288 }
289 }
290 }
291 }
292
293 self.set_status(status).await?;
294
295 Ok(())
296 }
297
298 fn reset_in_flight_sync(&self, status: &mut Status) {
299 status.syncing = false;
300 status.pulling_files.clear();
301 status.pushing_files.clear();
302 status.sync_status = None;
303 status.unexpected_sync_problem = None;
304 }
305
306 fn reset_sync_outcome(&self, status: &mut Status) {
307 status.offline = false;
308 status.update_required = false;
309 status.out_of_space = false;
310 }
311}
312
313impl Default for SpaceUpdater {
314 fn default() -> Self {
315 Self { spawned: false, last_computed: Instant::now(), initialized: false }
316 }
317}