lb_rs/subscribers/
status.rs1use basic_human_duration::ChronoHumanDuration;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use time::Duration;
5
6use tokio::sync::{Mutex, RwLock};
7use uuid::Uuid;
8use web_time::Instant;
9
10use crate::model::clock;
11use crate::model::errors::{LbErrKind, LbResult, Unexpected};
12use crate::service::events::{Event, SyncIncrement};
13use crate::service::usage::UsageMetrics;
14use crate::{Lb, tokio_spawn};
15
16#[derive(Clone, Default)]
17pub struct StatusUpdater {
18 current_status: Arc<RwLock<Status>>,
19 space_updated: Arc<Mutex<SpaceUpdater>>,
20}
21
22pub struct SpaceUpdater {
24 initialized: bool,
25 spawned: bool,
26 last_computed: Instant,
27}
28
29#[derive(Default, Clone, Debug, Serialize, Deserialize)]
39pub struct Status {
40 pub offline: bool,
42
43 pub syncing: bool,
45
46 pub out_of_space: bool,
48
49 pub pending_shares: bool,
51
52 pub update_required: bool,
54
55 pub pushing_files: Vec<Uuid>,
57
58 pub dirty_locally: Vec<Uuid>,
60
61 pub pulling_files: Vec<Uuid>,
65
66 pub space_used: Option<UsageMetrics>,
69
70 pub sync_status: Option<String>,
73
74 pub unexpected_sync_problem: Option<String>,
75}
76
77impl Status {
78 pub fn msg(&self) -> Option<String> {
79 if self.syncing {
80 return Some("Syncing...".to_string());
81 }
82
83 if self.offline {
84 if !self.dirty_locally.is_empty() {
85 let len = self.dirty_locally.len();
86 return Some(format!(
87 "Offline, {} change{} unsynced.",
88 len,
89 if len > 1 { "s" } else { "" }
90 ));
91 }
92
93 if let Some(last_synced) = &self.sync_status {
94 return Some(format!("Offline, last synced: {last_synced}"));
95 }
96
97 return Some("Offline.".to_string());
98 }
99
100 if self.out_of_space {
101 return Some("You're out of space!".to_string());
102 }
103
104 if self.update_required {
105 return Some("An update is required to continue.".to_string());
106 }
107
108 if let Some(err) = &self.unexpected_sync_problem {
109 return Some(err.to_string());
110 }
111
112 if !self.dirty_locally.is_empty() {
113 let dirty_locally = self.dirty_locally.len();
114 return Some(format!("{dirty_locally} changes unsynced"));
115 }
116
117 if let Some(last_synced) = &self.sync_status {
118 return Some(format!("Last synced: {last_synced}"));
119 }
120
121 None
122 }
123}
124
125impl Lb {
126 pub async fn status(&self) -> Status {
127 self.status.current_status.read().await.clone()
128 }
129
130 pub async fn set_initial_state(&self) -> LbResult<()> {
131 if self.keychain.get_account().is_ok() {
132 self.spawn_compute_usage().await;
133 let mut current = self.status.current_status.write().await;
134 current.dirty_locally = self.local_changes().await;
135 if current.dirty_locally.is_empty() {
136 current.sync_status = self.get_last_synced_human().await.log_and_ignore();
137 }
138 current.pending_shares = !self.get_pending_shares().await?.is_empty();
139 }
140
141 Ok(())
142 }
143
144 pub async fn setup_status(&self) -> LbResult<()> {
145 self.set_initial_state().await?;
146 let mut rx = self.subscribe();
147 let bg = self.clone();
148
149 tokio_spawn!(async move {
150 loop {
151 let evt = match rx.recv().await {
152 Ok(evt) => evt,
153 Err(err) => {
154 error!("failed to receive from a channel {err}");
155 return;
156 }
157 };
158 bg.process_event(evt).await.log_and_ignore();
159 }
160 });
161
162 Ok(())
163 }
164
165 pub async fn get_last_synced_human(&self) -> LbResult<String> {
166 let tx = self.ro_tx().await;
167 let db = tx.db();
168 let last_synced = db.last_synced.get().copied().unwrap_or(0);
169
170 Ok(self.get_timestamp_human_string(last_synced))
171 }
172
173 pub fn get_timestamp_human_string(&self, timestamp: i64) -> String {
174 if timestamp != 0 {
175 Duration::milliseconds(clock::get_time().0 - timestamp)
176 .format_human()
177 .to_string()
178 } else {
179 "never".to_string()
180 }
181 }
182
183 async fn process_event(&self, e: Event) -> LbResult<()> {
184 let current = self.status.current_status.read().await.clone();
185 match e {
186 Event::MetadataChanged(_) | Event::DocumentWritten(_, _) => {
187 self.compute_dirty_locally(current).await?;
188 }
189 Event::Sync(s) => self.update_sync(s, current).await?,
190 _ => {}
191 }
192 Ok(())
193 }
194
195 async fn set_status(&self, status: Status) -> LbResult<()> {
196 *self.status.current_status.write().await = status;
197 self.events.status_updated();
198 Ok(())
199 }
200
201 async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
202 let new = self.local_changes().await;
203 if new != status.dirty_locally {
204 status.dirty_locally = self.local_changes().await;
205 self.set_status(status).await?;
206 }
207 Ok(())
208 }
209
210 async fn spawn_compute_usage(&self) {
211 let mut lock = self.status.space_updated.lock().await;
212 if lock.spawned {
213 return;
214 }
215 let initialized = lock.initialized;
216 lock.spawned = true;
217 lock.initialized = true;
218 let computed = lock.last_computed;
219 drop(lock);
220
221 let bg = self.clone();
222 tokio_spawn!(async move {
223 if initialized && computed.elapsed() < web_time::Duration::from_secs(60) {
224 tokio::time::sleep(web_time::Duration::from_secs(60) - computed.elapsed()).await;
225 }
226 let usage = bg.get_usage().await.log_and_ignore();
227 let mut lock = bg.status.space_updated.lock().await;
228 lock.spawned = false;
229 lock.last_computed = Instant::now();
230 drop(lock);
231
232 bg.status.current_status.write().await.space_used = usage;
233 bg.events.status_updated();
234 });
235 }
236
237 async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
238 match s {
239 SyncIncrement::SyncStarted => {
240 self.reset_in_flight_sync(&mut status);
241 status.syncing = true;
242 }
243 SyncIncrement::PullingDocument(id, in_progress) => {
244 if in_progress {
245 status.pulling_files.push(id);
246 } else {
247 status.pulling_files.retain(|fid| id != *fid);
248 }
249 }
250 SyncIncrement::PushingDocument(id, in_progress) => {
251 if in_progress {
252 status.pushing_files.push(id);
253 } else {
254 status.pushing_files.retain(|fid| id != *fid);
255 }
256 }
257 SyncIncrement::SyncFinished(maybe_problem) => {
258 self.reset_sync_outcome(&mut status);
260 self.reset_in_flight_sync(&mut status);
261
262 self.spawn_compute_usage().await;
263 status.dirty_locally = self.local_changes().await;
264 if status.dirty_locally.is_empty() {
265 status.sync_status = self.get_last_synced_human().await.ok();
266 }
267 status.pending_shares = !self.get_pending_shares().await?.is_empty();
270 match maybe_problem {
271 Some(LbErrKind::ClientUpdateRequired) => {
272 status.update_required = true;
273 }
274 Some(LbErrKind::ServerUnreachable) => {
275 status.offline = true;
276 }
277 Some(LbErrKind::UsageIsOverDataCap) => {
278 status.out_of_space = true;
279 }
280 None => {}
281 Some(e) => {
282 status.unexpected_sync_problem =
283 Some(format!("unexpected error {e:?}: {e}"));
284 error!("unexpected error {e:?}: {e}");
285 }
286 }
287 }
288 }
289
290 self.set_status(status).await?;
291
292 Ok(())
293 }
294
295 fn reset_in_flight_sync(&self, status: &mut Status) {
296 status.syncing = false;
297 status.pulling_files.clear();
298 status.pushing_files.clear();
299 status.sync_status = None;
300 status.unexpected_sync_problem = None;
301 }
302
303 fn reset_sync_outcome(&self, status: &mut Status) {
304 status.offline = false;
305 status.update_required = false;
306 status.out_of_space = false;
307 }
308}
309
310impl Default for SpaceUpdater {
311 fn default() -> Self {
312 Self { spawned: false, last_computed: Instant::now(), initialized: false }
313 }
314}