lb_rs/subscribers/
status.rs1use std::{sync::Arc, time::Duration};
2
3use tokio::{
4 sync::{Mutex, RwLock},
5 time::Instant,
6};
7use uuid::Uuid;
8
9use crate::{
10 model::errors::{LbErrKind, LbResult, Unexpected},
11 service::{events::Event, sync::SyncIncrement, usage::UsageMetrics},
12 Lb,
13};
14
15#[derive(Clone, Default)]
16pub struct StatusUpdater {
17 current_status: Arc<RwLock<Status>>,
18 space_updated: Arc<Mutex<SpaceUpdater>>,
19}
20
21pub struct SpaceUpdater {
23 initialized: bool,
24 spawned: bool,
25 last_computed: Instant,
26}
27
28#[derive(Default, Clone, Debug)]
38pub struct Status {
39 pub offline: bool,
41
42 pub syncing: bool,
44
45 pub out_of_space: bool,
47
48 pub pending_shares: bool,
50
51 pub update_required: bool,
53
54 pub pushing_files: Vec<Uuid>,
56
57 pub dirty_locally: Vec<Uuid>,
59
60 pub pulling_files: Vec<Uuid>,
64
65 pub space_used: Option<UsageMetrics>,
68
69 pub sync_status: Option<String>,
72
73 pub unexpected_sync_problem: Option<String>,
74}
75
76impl Status {
77 pub fn msg(&self) -> Option<String> {
78 if self.syncing {
79 return Some("Syncing...".to_string());
80 }
81
82 if self.offline {
83 if !self.dirty_locally.is_empty() {
84 let len = self.dirty_locally.len();
85 return Some(format!(
86 "Offline, {} change{} unsynced.",
87 len,
88 if len > 1 { "s" } else { "" }
89 ));
90 }
91
92 if let Some(last_synced) = &self.sync_status {
93 return Some(format!("Offline, last synced: {}", last_synced));
94 }
95
96 return Some("Offline.".to_string());
97 }
98
99 if self.out_of_space {
100 return Some("You're out of space!".to_string());
101 }
102
103 if self.update_required {
104 return Some("An update is required to continue.".to_string());
105 }
106
107 if let Some(err) = &self.unexpected_sync_problem {
108 return Some(err.to_string());
109 }
110
111 if !self.dirty_locally.is_empty() {
112 let dirty_locally = self.dirty_locally.len();
113 return Some(format!("{dirty_locally} changes unsynced"));
114 }
115
116 if let Some(last_synced) = &self.sync_status {
117 return Some(format!("Last synced: {}", last_synced));
118 }
119
120 None
121 }
122}
123
124impl Lb {
125 pub async fn status(&self) -> Status {
126 self.status.current_status.read().await.clone()
127 }
128
129 pub async fn set_initial_state(&self) -> LbResult<()> {
130 if self.keychain.get_account().is_ok() {
131 self.spawn_compute_usage().await;
132 let mut current = self.status.current_status.write().await;
133 current.dirty_locally = self.local_changes().await;
134 if current.dirty_locally.is_empty() {
135 current.sync_status = self.get_last_synced_human().await.log_and_ignore();
136 }
137 current.pending_shares = !self.get_pending_shares().await?.is_empty();
138 }
139
140 Ok(())
141 }
142
143 pub async fn setup_status(&self) -> LbResult<()> {
144 self.set_initial_state().await?;
145 let mut rx = self.subscribe();
146 let bg = self.clone();
147
148 tokio::spawn(async move {
149 loop {
150 let evt = match rx.recv().await {
151 Ok(evt) => evt,
152 Err(err) => {
153 error!("failed to receive from a channel {err}");
154 return;
155 }
156 };
157 bg.process_event(evt).await.log_and_ignore();
158 }
159 });
160
161 Ok(())
162 }
163
164 async fn process_event(&self, e: Event) -> LbResult<()> {
165 let current = self.status.current_status.read().await.clone();
166 match e {
167 Event::MetadataChanged | Event::DocumentWritten(_, _) => {
168 self.compute_dirty_locally(current).await?;
169 }
170 Event::Sync(s) => self.update_sync(s, current).await?,
171 _ => {}
172 }
173 Ok(())
174 }
175
176 async fn set_status(&self, status: Status) -> LbResult<()> {
177 *self.status.current_status.write().await = status;
178 self.events.status_updated();
179 Ok(())
180 }
181
182 async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
183 let new = self.local_changes().await;
184 if new != status.dirty_locally {
185 status.dirty_locally = self.local_changes().await;
186 self.set_status(status).await?;
187 }
188 Ok(())
189 }
190
191 async fn spawn_compute_usage(&self) {
192 let mut lock = self.status.space_updated.lock().await;
193 if lock.spawned {
194 return;
195 }
196 let initialized = lock.initialized;
197 lock.spawned = true;
198 lock.initialized = true;
199 let computed = lock.last_computed;
200 drop(lock);
201
202 let bg = self.clone();
203 tokio::spawn(async move {
204 if initialized && computed.elapsed() < Duration::from_secs(60) {
205 tokio::time::sleep(Duration::from_secs(60) - computed.elapsed()).await;
206 }
207 let usage = bg.get_usage().await.log_and_ignore();
208 let mut lock = bg.status.space_updated.lock().await;
209 lock.spawned = false;
210 lock.last_computed = Instant::now();
211 drop(lock);
212
213 bg.status.current_status.write().await.space_used = usage;
214 bg.events.status_updated();
215 });
216 }
217
218 async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
219 match s {
220 SyncIncrement::SyncStarted => {
221 self.reset_sync(&mut status);
222 status.syncing = true;
223 }
224 SyncIncrement::PullingDocument(id, in_progress) => {
225 if in_progress {
226 status.pulling_files.push(id);
227 } else {
228 status.pulling_files.retain(|fid| id != *fid);
229 }
230 }
231 SyncIncrement::PushingDocument(id, in_progress) => {
232 if in_progress {
233 status.pushing_files.push(id);
234 } else {
235 status.pushing_files.retain(|fid| id != *fid);
236 }
237 }
238 SyncIncrement::SyncFinished(maybe_problem) => {
239 self.reset_sync(&mut status);
240 self.spawn_compute_usage().await;
241 status.dirty_locally = self.local_changes().await;
242 if status.dirty_locally.is_empty() {
243 status.sync_status = self.get_last_synced_human().await.ok();
244 }
245 status.pending_shares = !self.get_pending_shares().await?.is_empty();
246 match maybe_problem {
247 Some(LbErrKind::ClientUpdateRequired) => {
248 status.update_required = true;
249 }
250 Some(LbErrKind::ServerUnreachable) => {
251 status.offline = true;
252 }
253 Some(LbErrKind::UsageIsOverDataCap) => {
254 status.out_of_space = true;
255 }
256 None => {}
257 Some(e) => {
258 status.unexpected_sync_problem =
259 Some(format!("unexpected error {e:?}: {e}"));
260 error!("unexpected error {e:?}: {e}");
261 }
262 }
263 }
264 }
265
266 self.set_status(status).await?;
267
268 Ok(())
269 }
270
271 fn reset_sync(&self, status: &mut Status) {
272 status.syncing = false;
273 status.pulling_files.clear();
274 status.pushing_files.clear();
275 status.offline = false;
276 status.update_required = false;
277 status.out_of_space = false;
278 status.sync_status = None;
279 status.unexpected_sync_problem = None;
280 }
281}
282
283impl Default for SpaceUpdater {
284 fn default() -> Self {
285 Self { spawned: false, last_computed: Instant::now(), initialized: false }
286 }
287}