1use crate::{
3 event_integrity, vault_integrity, Error, IntegrityFailure, Result,
4};
5use futures::{pin_mut, StreamExt};
6use sos_backend::BackendTarget;
7use sos_core::{
8 commit::CommitHash, events::EventRecord, AccountId, SecretId, VaultId,
9};
10use sos_database::entity::FolderEntity;
11use sos_vault::Summary;
12use sos_vfs as vfs;
13use std::sync::Arc;
14use tokio::sync::{
15 mpsc::{self, Receiver, Sender},
16 watch, Mutex, Semaphore,
17};
18
19#[derive(Debug)]
21pub enum FolderIntegrityEvent {
22 Begin(usize),
24 Failure(VaultId, IntegrityFailure),
26 OpenFolder(VaultId),
28 VaultRecord(VaultId, (SecretId, CommitHash)),
30 EventRecord(VaultId, EventRecord),
32 CloseFolder(VaultId),
39 Complete,
41}
42
43pub async fn account_integrity(
45 target: &BackendTarget,
46 account_id: &AccountId,
47 folders: Vec<Summary>,
48 concurrency: usize,
49) -> Result<(Receiver<FolderIntegrityEvent>, watch::Sender<bool>)> {
50 let (mut event_tx, event_rx) = mpsc::channel::<FolderIntegrityEvent>(64);
51 let (cancel_tx, mut cancel_rx) = watch::channel(false);
52
53 notify_listeners(
54 &mut event_tx,
55 FolderIntegrityEvent::Begin(folders.len()),
56 )
57 .await;
58
59 let num_folders = folders.len();
60 let semaphore = Arc::new(Semaphore::new(concurrency));
61 let cancel = cancel_tx.clone();
62 let account_id = *account_id;
63 let target = target.clone();
64 tokio::task::spawn(async move {
65 let mut stream = futures::stream::iter(folders);
66 let completed = Arc::new(Mutex::new(0));
67 loop {
68 tokio::select! {
69 biased;
70 _ = cancel_rx.changed() => {
71 break;
72 }
73 Some(folder) = stream.next() => {
74 let semaphore = semaphore.clone();
75 let cancel_tx = cancel.clone();
76 let cancel_rx = cancel_rx.clone();
77 let event_tx = event_tx.clone();
78 let completed = completed.clone();
79 let target = target.clone();
80 tokio::task::spawn(async move {
81 let _permit = semaphore.acquire().await;
82
83 check_folder(
84 target,
85 &account_id,
86 folder.id(),
87 event_tx.clone(),
88 cancel_rx).await?;
89
90 let mut writer = completed.lock().await;
91 *writer += 1;
92 if *writer == num_folders {
93 if let Err(error) = cancel_tx.send(true) {
97 tracing::error!(error = ?error);
98 }
99 }
100 Ok::<_, crate::Error>(())
101 });
102 }
103 }
104 }
105
106 notify_listeners(&mut event_tx, FolderIntegrityEvent::Complete).await;
107
108 Ok::<_, crate::Error>(())
109 });
110
111 Ok((event_rx, cancel_tx))
112}
113
114async fn check_folder(
115 target: BackendTarget,
116 account_id: &AccountId,
117 folder_id: &VaultId,
118 mut integrity_tx: Sender<FolderIntegrityEvent>,
119 mut cancel_rx: watch::Receiver<bool>,
120) -> Result<()> {
121 notify_listeners(
122 &mut integrity_tx,
123 FolderIntegrityEvent::OpenFolder(*folder_id),
124 )
125 .await;
126
127 let vault_id = *folder_id;
128 let event_id = *folder_id;
129 let mut vault_tx = integrity_tx.clone();
130 let mut event_tx = integrity_tx.clone();
131 let mut vault_cancel_rx = cancel_rx.clone();
132
133 let vault_target = target.clone();
134 let event_target = target.clone();
135
136 let account_id = *account_id;
137 let folder_id = *folder_id;
138
139 match &target {
141 BackendTarget::FileSystem(paths) => {
142 let vault_path = paths.vault_path(&folder_id);
143 let events_path = paths.event_log_path(&folder_id);
144 if !vfs::try_exists(&vault_path).await?
145 || !vfs::try_exists(&events_path).await?
146 {
147 notify_listeners(
148 &mut vault_tx,
149 FolderIntegrityEvent::Failure(
150 folder_id,
151 IntegrityFailure::MissingFolder(folder_id),
152 ),
153 )
154 .await;
155
156 return Ok(());
157 }
158 }
159 BackendTarget::Database(_, client) => {
160 let db_folder_id = folder_id;
161 let folder_row = client
162 .conn(move |conn| {
163 let folder_entity = FolderEntity::new(&conn);
164 folder_entity.find_optional(&db_folder_id)
165 })
166 .await?;
167
168 if folder_row.is_none() {
169 notify_listeners(
170 &mut vault_tx,
171 FolderIntegrityEvent::Failure(
172 folder_id,
173 IntegrityFailure::MissingFolder(folder_id),
174 ),
175 )
176 .await;
177 return Ok(());
178 }
179 }
180 }
181
182 let v_jh = tokio::task::spawn(async move {
183 let vault_stream =
184 vault_integrity(&vault_target, &account_id, &folder_id);
185 pin_mut!(vault_stream);
186 loop {
187 tokio::select! {
188 biased;
189 _ = vault_cancel_rx.changed() => {
190 break;
191 }
192 event = vault_stream.next() => {
193 if let Some(record) = event {
194 match record {
195 Ok(record) => {
196 notify_listeners(
197 &mut vault_tx,
198 FolderIntegrityEvent::VaultRecord(
199 vault_id, record),
200 )
201 .await;
202 }
203 Err(e) => {
204 match e {
205 Error::VaultHashMismatch { commit, value, .. } => {
206 notify_listeners(
207 &mut vault_tx,
208 FolderIntegrityEvent::Failure(
209 vault_id, IntegrityFailure::CorruptedFolder {
210 folder_id: vault_id,
211 expected: commit,
212 actual: value,
213 }),
214 )
215 .await;
216 }
217 _ => {
218 notify_listeners(
219 &mut vault_tx,
220 FolderIntegrityEvent::Failure(
221 vault_id, IntegrityFailure::Error(e)),
222 )
223 .await;
224 }
225 }
226 }
227 }
228 } else {
229 break;
230 }
231 }
232 }
233 }
234
235 Ok::<_, Error>(())
236 });
237
238 let e_jh = tokio::task::spawn(async move {
239 let event_stream =
240 event_integrity(&event_target, &account_id, &folder_id);
241 pin_mut!(event_stream);
242
243 loop {
244 tokio::select! {
245 biased;
246 _ = cancel_rx.changed() => {
247 break;
248 }
249 event = event_stream.next() => {
250 if let Some(record) = event {
251 match record {
252 Ok(record) => {
253 notify_listeners(
254 &mut event_tx,
255 FolderIntegrityEvent::EventRecord(event_id, record),
256 )
257 .await;
258 }
259 Err(e) => {
260 match e {
261 Error::HashMismatch { commit, value, .. } => {
262 notify_listeners(
263 &mut event_tx,
264 FolderIntegrityEvent::Failure(
265 vault_id, IntegrityFailure::CorruptedFolder {
266 folder_id: vault_id,
267 expected: commit,
268 actual: value,
269 }),
270 )
271 .await;
272 }
273 _ => {
274 notify_listeners(
275 &mut event_tx,
276 FolderIntegrityEvent::Failure(
277 vault_id, IntegrityFailure::Error(e)),
278 )
279 .await;
280 }
281 }
282 }
283 }
284 } else {
285 break;
286 }
287 }
288 }
289 }
290
291 Ok::<_, Error>(())
292 });
293
294 let results = futures::future::try_join_all(vec![v_jh, e_jh]).await?;
295 let is_ok = results.iter().all(|r| r.is_ok());
296 for result in results {
297 if let Err(e) = result {
298 notify_listeners(
299 &mut integrity_tx,
300 FolderIntegrityEvent::Failure(
301 folder_id,
302 IntegrityFailure::Error(e),
303 ),
304 )
305 .await;
306 }
307 }
308
309 if is_ok {
310 notify_listeners(
311 &mut integrity_tx,
312 FolderIntegrityEvent::CloseFolder(folder_id),
313 )
314 .await;
315 }
316
317 Ok(())
318}
319
320async fn notify_listeners(
321 tx: &mut Sender<FolderIntegrityEvent>,
322 event: FolderIntegrityEvent,
323) {
324 if let Err(error) = tx.send(event).await {
325 tracing::warn!(error = ?error.0, "account_integrity::send");
326 }
327}