lb_rs/subscribers/
status.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::{Mutex, RwLock};
5use tokio::time::Instant;
6use uuid::Uuid;
7
8use crate::Lb;
9use crate::model::errors::{LbErrKind, LbResult, Unexpected};
10use crate::service::events::Event;
11use crate::service::sync::SyncIncrement;
12use crate::service::usage::UsageMetrics;
13
14#[derive(Clone, Default)]
15pub struct StatusUpdater {
16 current_status: Arc<RwLock<Status>>,
17 space_updated: Arc<Mutex<SpaceUpdater>>,
18}
19
20pub struct SpaceUpdater {
22 initialized: bool,
23 spawned: bool,
24 last_computed: Instant,
25}
26
27#[derive(Default, Clone, Debug)]
37pub struct Status {
38 pub offline: bool,
40
41 pub syncing: bool,
43
44 pub out_of_space: bool,
46
47 pub pending_shares: bool,
49
50 pub update_required: bool,
52
53 pub pushing_files: Vec<Uuid>,
55
56 pub dirty_locally: Vec<Uuid>,
58
59 pub pulling_files: Vec<Uuid>,
63
64 pub space_used: Option<UsageMetrics>,
67
68 pub sync_status: Option<String>,
71
72 pub unexpected_sync_problem: Option<String>,
73}
74
75impl Status {
76 pub fn msg(&self) -> Option<String> {
77 if self.syncing {
78 return Some("Syncing...".to_string());
79 }
80
81 if self.offline {
82 if !self.dirty_locally.is_empty() {
83 let len = self.dirty_locally.len();
84 return Some(format!(
85 "Offline, {} change{} unsynced.",
86 len,
87 if len > 1 { "s" } else { "" }
88 ));
89 }
90
91 if let Some(last_synced) = &self.sync_status {
92 return Some(format!("Offline, last synced: {last_synced}"));
93 }
94
95 return Some("Offline.".to_string());
96 }
97
98 if self.out_of_space {
99 return Some("You're out of space!".to_string());
100 }
101
102 if self.update_required {
103 return Some("An update is required to continue.".to_string());
104 }
105
106 if let Some(err) = &self.unexpected_sync_problem {
107 return Some(err.to_string());
108 }
109
110 if !self.dirty_locally.is_empty() {
111 let dirty_locally = self.dirty_locally.len();
112 return Some(format!("{dirty_locally} changes unsynced"));
113 }
114
115 if let Some(last_synced) = &self.sync_status {
116 return Some(format!("Last synced: {last_synced}"));
117 }
118
119 None
120 }
121}
122
123impl Lb {
124 pub async fn status(&self) -> Status {
125 self.status.current_status.read().await.clone()
126 }
127
128 pub async fn set_initial_state(&self) -> LbResult<()> {
129 if self.keychain.get_account().is_ok() {
130 self.spawn_compute_usage().await;
131 let mut current = self.status.current_status.write().await;
132 current.dirty_locally = self.local_changes().await;
133 if current.dirty_locally.is_empty() {
134 current.sync_status = self.get_last_synced_human().await.log_and_ignore();
135 }
136 current.pending_shares = !self.get_pending_shares().await?.is_empty();
137 }
138
139 Ok(())
140 }
141
142 pub async fn setup_status(&self) -> LbResult<()> {
143 self.set_initial_state().await?;
144 let mut rx = self.subscribe();
145 let bg = self.clone();
146
147 tokio::spawn(async move {
148 loop {
149 let evt = match rx.recv().await {
150 Ok(evt) => evt,
151 Err(err) => {
152 error!("failed to receive from a channel {err}");
153 return;
154 }
155 };
156 bg.process_event(evt).await.log_and_ignore();
157 }
158 });
159
160 Ok(())
161 }
162
163 async fn process_event(&self, e: Event) -> LbResult<()> {
164 let current = self.status.current_status.read().await.clone();
165 match e {
166 Event::MetadataChanged | Event::DocumentWritten(_, _) => {
167 self.compute_dirty_locally(current).await?;
168 }
169 Event::Sync(s) => self.update_sync(s, current).await?,
170 _ => {}
171 }
172 Ok(())
173 }
174
175 async fn set_status(&self, status: Status) -> LbResult<()> {
176 *self.status.current_status.write().await = status;
177 self.events.status_updated();
178 Ok(())
179 }
180
181 async fn compute_dirty_locally(&self, mut status: Status) -> LbResult<()> {
182 let new = self.local_changes().await;
183 if new != status.dirty_locally {
184 status.dirty_locally = self.local_changes().await;
185 self.set_status(status).await?;
186 }
187 Ok(())
188 }
189
190 async fn spawn_compute_usage(&self) {
191 let mut lock = self.status.space_updated.lock().await;
192 if lock.spawned {
193 return;
194 }
195 let initialized = lock.initialized;
196 lock.spawned = true;
197 lock.initialized = true;
198 let computed = lock.last_computed;
199 drop(lock);
200
201 let bg = self.clone();
202 tokio::spawn(async move {
203 if initialized && computed.elapsed() < Duration::from_secs(60) {
204 tokio::time::sleep(Duration::from_secs(60) - computed.elapsed()).await;
205 }
206 let usage = bg.get_usage().await.log_and_ignore();
207 let mut lock = bg.status.space_updated.lock().await;
208 lock.spawned = false;
209 lock.last_computed = Instant::now();
210 drop(lock);
211
212 bg.status.current_status.write().await.space_used = usage;
213 bg.events.status_updated();
214 });
215 }
216
217 async fn update_sync(&self, s: SyncIncrement, mut status: Status) -> LbResult<()> {
218 match s {
219 SyncIncrement::SyncStarted => {
220 self.reset_sync(&mut status);
221 status.syncing = true;
222 }
223 SyncIncrement::PullingDocument(id, in_progress) => {
224 if in_progress {
225 status.pulling_files.push(id);
226 } else {
227 status.pulling_files.retain(|fid| id != *fid);
228 }
229 }
230 SyncIncrement::PushingDocument(id, in_progress) => {
231 if in_progress {
232 status.pushing_files.push(id);
233 } else {
234 status.pushing_files.retain(|fid| id != *fid);
235 }
236 }
237 SyncIncrement::SyncFinished(maybe_problem) => {
238 self.reset_sync(&mut status);
239 self.spawn_compute_usage().await;
240 status.dirty_locally = self.local_changes().await;
241 if status.dirty_locally.is_empty() {
242 status.sync_status = self.get_last_synced_human().await.ok();
243 }
244 status.pending_shares = !self.get_pending_shares().await?.is_empty();
247 match maybe_problem {
248 Some(LbErrKind::ClientUpdateRequired) => {
249 status.update_required = true;
250 }
251 Some(LbErrKind::ServerUnreachable) => {
252 status.offline = true;
253 }
254 Some(LbErrKind::UsageIsOverDataCap) => {
255 status.out_of_space = true;
256 }
257 None => {}
258 Some(e) => {
259 status.unexpected_sync_problem =
260 Some(format!("unexpected error {e:?}: {e}"));
261 error!("unexpected error {e:?}: {e}");
262 }
263 }
264 }
265 }
266
267 self.set_status(status).await?;
268
269 Ok(())
270 }
271
272 fn reset_sync(&self, status: &mut Status) {
273 status.syncing = false;
274 status.pulling_files.clear();
275 status.pushing_files.clear();
276 status.offline = false;
277 status.update_required = false;
278 status.out_of_space = false;
279 status.sync_status = None;
280 status.unexpected_sync_problem = None;
281 }
282}
283
284impl Default for SpaceUpdater {
285 fn default() -> Self {
286 Self { spawned: false, last_computed: Instant::now(), initialized: false }
287 }
288}