lb_rs/subscribers/
status.rs1use std::{sync::Arc, time::Duration};
2
3use tokio::{
4 sync::{Mutex, RwLock},
5 time::Instant,
6};
7use uuid::Uuid;
8
9use crate::{
10 model::errors::{LbErrKind, LbResult, Unexpected},
11 service::{events::Event, sync::SyncIncrement, usage::UsageMetrics},
12 Lb,
13};
14
15#[derive(Clone, Default)]
16pub struct StatusUpdater {
17 current_status: Arc<RwLock<Status>>,
18 space_updated: Arc<Mutex<SpaceUpdater>>,
19}
20
21pub struct SpaceUpdater {
22 spawned: bool,
23 last_computed: Instant,
24}
25
26#[derive(Default, Clone)]
36pub struct Status {
37 pub offline: bool,
39
40 pub syncing: bool,
42
43 pub out_of_space: bool,
45
46 pub pending_shares: bool,
48
49 pub update_required: bool,
51
52 pub pushing_files: Vec<Uuid>,
54
55 pub dirty_locally: Vec<Uuid>,
57
58 pub pulling_files: Vec<Uuid>,
62
63 pub space_used: Option<UsageMetrics>,
66
67 pub sync_status: Option<String>,
70}
71
72impl Lb {
73 pub async fn status(&self) -> Status {
74 self.status.current_status.read().await.clone()
75 }
76
77 pub async fn set_initial_state(&self) -> LbResult<()> {
78 if self.keychain.get_account().is_ok() {
79 let mut current = self.status.current_status.write().await;
80 current.dirty_locally = self.local_changes().await;
81 if current.dirty_locally.is_empty() {
82 current.sync_status = self.get_last_synced_human().await.log_and_ignore();
83 }
84 current.pending_shares = !self.get_pending_shares().await?.is_empty();
85 }
86
87 Ok(())
88 }
89
90 pub async fn setup_status(&self) -> LbResult<()> {
91 self.set_initial_state().await?;
92 let mut rx = self.subscribe();
93 let bg = self.clone();
94
95 tokio::spawn(async move {
96 loop {
97 let evt = match rx.recv().await {
98 Ok(evt) => evt,
99 Err(err) => {
100 error!("failed to receive from a channel {err}");
101 return;
102 }
103 };
104 bg.process_event(evt).await.log_and_ignore();
105 }
106 });
107
108 Ok(())
109 }
110
111 async fn process_event(&self, e: Event) -> LbResult<()> {
112 let mut current = self.status.current_status.read().await.clone();
113 match e {
114 Event::MetadataChanged | Event::DocumentWritten(_) => {
115 self.compute_dirty_locally(&mut current).await?;
116 }
117 Event::Sync(s) => self.update_sync(s, &mut current).await?,
118 _ => {}
119 }
120 Ok(())
121 }
122
123 async fn compute_dirty_locally(&self, status: &mut Status) -> LbResult<()> {
124 let new = self.local_changes().await;
125 if new != status.dirty_locally {
126 status.dirty_locally = self.local_changes().await;
127 self.events.status_updated();
128 }
129 Ok(())
130 }
131
132 async fn compute_usage(&self) {
133 let mut lock = self.status.space_updated.lock().await;
134 if lock.spawned {
135 return;
136 }
137 lock.spawned = true;
138 let computed = lock.last_computed;
139 drop(lock);
140
141 let bg = self.clone();
142 tokio::spawn(async move {
143 if computed.elapsed() < Duration::from_secs(60) {
144 tokio::time::sleep(Duration::from_secs(60) - computed.elapsed()).await;
145 }
146 let usage = bg.get_usage().await.log_and_ignore();
147 let mut lock = bg.status.space_updated.lock().await;
148 lock.spawned = false;
149 lock.last_computed = Instant::now();
150 drop(lock);
151
152 bg.status.current_status.write().await.space_used = usage;
153 bg.events.status_updated();
154 });
155 }
156
157 async fn update_sync(&self, s: SyncIncrement, status: &mut Status) -> LbResult<()> {
158 match s {
159 SyncIncrement::SyncStarted => {
160 self.reset_sync(status);
161 status.syncing = true;
162 }
163 SyncIncrement::PullingDocument(id, in_progress) => {
164 if in_progress {
165 status.pulling_files.push(id);
166 } else {
167 status.pulling_files.retain(|fid| id != *fid);
168 }
169 }
170 SyncIncrement::PushingDocument(id, in_progress) => {
171 if in_progress {
172 status.pushing_files.push(id);
173 } else {
174 status.pushing_files.retain(|fid| id != *fid);
175 }
176 }
177 SyncIncrement::SyncFinished(maybe_problem) => {
178 self.reset_sync(status);
179 self.compute_usage().await;
180 match maybe_problem {
181 Some(LbErrKind::ClientUpdateRequired) => {
182 status.update_required = true;
183 }
184 Some(LbErrKind::ServerUnreachable) => {
185 status.offline = true;
186 }
187 Some(LbErrKind::UsageIsOverDataCap) => {
188 status.out_of_space = true;
189 }
190 None => {
191 status.dirty_locally = self.local_changes().await;
192 if status.dirty_locally.is_empty() {
193 status.sync_status = self.get_last_synced_human().await.ok();
194 }
195 status.pending_shares = !self.get_pending_shares().await?.is_empty();
196 }
197 _ => {
198 error!("unexpected sync problem found {maybe_problem:?}");
199 }
200 }
201 }
202 }
203
204 self.events.status_updated();
205
206 Ok(())
207 }
208
209 fn reset_sync(&self, status: &mut Status) {
210 status.syncing = false;
211 status.pulling_files.clear();
212 status.pushing_files.clear();
213 status.offline = false;
214 status.update_required = false;
215 status.out_of_space = false;
216 status.sync_status = None;
217 }
218}
219
220impl Default for SpaceUpdater {
221 fn default() -> Self {
222 Self { spawned: false, last_computed: Instant::now() }
223 }
224}