sos_integrity/
account_integrity.rs

1//! Check integrity of the folders in an account.
2use crate::{
3    event_integrity, vault_integrity, Error, IntegrityFailure, Result,
4};
5use futures::{pin_mut, StreamExt};
6use sos_backend::BackendTarget;
7use sos_core::{
8    commit::CommitHash, events::EventRecord, AccountId, SecretId, VaultId,
9};
10use sos_database::entity::FolderEntity;
11use sos_vault::Summary;
12use sos_vfs as vfs;
13use std::sync::Arc;
14use tokio::sync::{
15    mpsc::{self, Receiver, Sender},
16    watch, Mutex, Semaphore,
17};
18
19/// Event dispatched whilst generating an integrity report.
20#[derive(Debug)]
21pub enum FolderIntegrityEvent {
22    /// Begin processing the given number of folders.
23    Begin(usize),
24    /// Integrity check failed.
25    Failure(VaultId, IntegrityFailure),
26    /// Started integrity check on a folder.
27    OpenFolder(VaultId),
28    /// Read a record in a vault.
29    VaultRecord(VaultId, (SecretId, CommitHash)),
30    /// Read a record in an event log.
31    EventRecord(VaultId, EventRecord),
32    /// Finished integrity check on a folder.
33    ///
34    /// This event is only sent when a folder integrity
35    /// check completes successfully.
36    ///
37    /// Errors are reported as a failure event.
38    CloseFolder(VaultId),
39    /// Folder integrity check completed.
40    Complete,
41}
42
43/// Generate an integrity report for the folders in an account.
44pub async fn account_integrity(
45    target: &BackendTarget,
46    account_id: &AccountId,
47    folders: Vec<Summary>,
48    concurrency: usize,
49) -> Result<(Receiver<FolderIntegrityEvent>, watch::Sender<bool>)> {
50    let (mut event_tx, event_rx) = mpsc::channel::<FolderIntegrityEvent>(64);
51    let (cancel_tx, mut cancel_rx) = watch::channel(false);
52
53    notify_listeners(
54        &mut event_tx,
55        FolderIntegrityEvent::Begin(folders.len()),
56    )
57    .await;
58
59    let num_folders = folders.len();
60    let semaphore = Arc::new(Semaphore::new(concurrency));
61    let cancel = cancel_tx.clone();
62    let account_id = *account_id;
63    let target = target.clone();
64    tokio::task::spawn(async move {
65        let mut stream = futures::stream::iter(folders);
66        let completed = Arc::new(Mutex::new(0));
67        loop {
68            tokio::select! {
69              biased;
70              _ = cancel_rx.changed() => {
71                break;
72              }
73              Some(folder) = stream.next() => {
74                let semaphore = semaphore.clone();
75                let cancel_tx = cancel.clone();
76                let cancel_rx = cancel_rx.clone();
77                let event_tx = event_tx.clone();
78                let completed = completed.clone();
79                let target = target.clone();
80                tokio::task::spawn(async move {
81                  let _permit = semaphore.acquire().await;
82
83                  check_folder(
84                    target,
85                    &account_id,
86                    folder.id(),
87                    event_tx.clone(),
88                    cancel_rx).await?;
89
90                  let mut writer = completed.lock().await;
91                  *writer += 1;
92                  if *writer == num_folders {
93                    // Signal the shutdown event on the cancel channel
94                    // to break out of this loop and cancel any existing
95                    // file reader streams
96                    if let Err(error) = cancel_tx.send(true) {
97                      tracing::error!(error = ?error);
98                    }
99                  }
100                  Ok::<_, crate::Error>(())
101                });
102              }
103            }
104        }
105
106        notify_listeners(&mut event_tx, FolderIntegrityEvent::Complete).await;
107
108        Ok::<_, crate::Error>(())
109    });
110
111    Ok((event_rx, cancel_tx))
112}
113
114async fn check_folder(
115    target: BackendTarget,
116    account_id: &AccountId,
117    folder_id: &VaultId,
118    mut integrity_tx: Sender<FolderIntegrityEvent>,
119    mut cancel_rx: watch::Receiver<bool>,
120) -> Result<()> {
121    notify_listeners(
122        &mut integrity_tx,
123        FolderIntegrityEvent::OpenFolder(*folder_id),
124    )
125    .await;
126
127    let vault_id = *folder_id;
128    let event_id = *folder_id;
129    let mut vault_tx = integrity_tx.clone();
130    let mut event_tx = integrity_tx.clone();
131    let mut vault_cancel_rx = cancel_rx.clone();
132
133    let vault_target = target.clone();
134    let event_target = target.clone();
135
136    let account_id = *account_id;
137    let folder_id = *folder_id;
138
139    // Check folders exist
140    match &target {
141        BackendTarget::FileSystem(paths) => {
142            let vault_path = paths.vault_path(&folder_id);
143            let events_path = paths.event_log_path(&folder_id);
144            if !vfs::try_exists(&vault_path).await?
145                || !vfs::try_exists(&events_path).await?
146            {
147                notify_listeners(
148                    &mut vault_tx,
149                    FolderIntegrityEvent::Failure(
150                        folder_id,
151                        IntegrityFailure::MissingFolder(folder_id),
152                    ),
153                )
154                .await;
155
156                return Ok(());
157            }
158        }
159        BackendTarget::Database(_, client) => {
160            let db_folder_id = folder_id;
161            let folder_row = client
162                .conn(move |conn| {
163                    let folder_entity = FolderEntity::new(&conn);
164                    folder_entity.find_optional(&db_folder_id)
165                })
166                .await?;
167
168            if folder_row.is_none() {
169                notify_listeners(
170                    &mut vault_tx,
171                    FolderIntegrityEvent::Failure(
172                        folder_id,
173                        IntegrityFailure::MissingFolder(folder_id),
174                    ),
175                )
176                .await;
177                return Ok(());
178            }
179        }
180    }
181
182    let v_jh = tokio::task::spawn(async move {
183        let vault_stream =
184            vault_integrity(&vault_target, &account_id, &folder_id);
185        pin_mut!(vault_stream);
186        loop {
187            tokio::select! {
188              biased;
189              _ = vault_cancel_rx.changed() => {
190                break;
191              }
192              event = vault_stream.next() => {
193                if let Some(record) = event {
194                  match record {
195                    Ok(record) => {
196                      notify_listeners(
197                          &mut vault_tx,
198                          FolderIntegrityEvent::VaultRecord(
199                            vault_id, record),
200                      )
201                      .await;
202                    }
203                    Err(e) => {
204                      match e {
205                        Error::VaultHashMismatch { commit, value, .. } => {
206                          notify_listeners(
207                              &mut vault_tx,
208                              FolderIntegrityEvent::Failure(
209                                vault_id, IntegrityFailure::CorruptedFolder {
210                                  folder_id: vault_id,
211                                  expected: commit,
212                                  actual: value,
213                                }),
214                          )
215                          .await;
216                        }
217                        _ => {
218                          notify_listeners(
219                              &mut vault_tx,
220                              FolderIntegrityEvent::Failure(
221                                vault_id, IntegrityFailure::Error(e)),
222                          )
223                          .await;
224                        }
225                      }
226                    }
227                  }
228                } else {
229                  break;
230                }
231              }
232            }
233        }
234
235        Ok::<_, Error>(())
236    });
237
238    let e_jh = tokio::task::spawn(async move {
239        let event_stream =
240            event_integrity(&event_target, &account_id, &folder_id);
241        pin_mut!(event_stream);
242
243        loop {
244            tokio::select! {
245              biased;
246              _ = cancel_rx.changed() => {
247                break;
248              }
249              event = event_stream.next() => {
250                if let Some(record) = event {
251                  match record {
252                    Ok(record) => {
253                      notify_listeners(
254                          &mut event_tx,
255                          FolderIntegrityEvent::EventRecord(event_id, record),
256                      )
257                      .await;
258                    }
259                    Err(e) => {
260                      match e {
261                        Error::HashMismatch { commit, value, .. } => {
262                          notify_listeners(
263                              &mut event_tx,
264                              FolderIntegrityEvent::Failure(
265                                vault_id, IntegrityFailure::CorruptedFolder {
266                                  folder_id: vault_id,
267                                  expected: commit,
268                                  actual: value,
269                                }),
270                          )
271                          .await;
272                        }
273                        _ => {
274                          notify_listeners(
275                              &mut event_tx,
276                              FolderIntegrityEvent::Failure(
277                                vault_id, IntegrityFailure::Error(e)),
278                          )
279                          .await;
280                        }
281                      }
282                    }
283                  }
284                } else {
285                  break;
286                }
287              }
288            }
289        }
290
291        Ok::<_, Error>(())
292    });
293
294    let results = futures::future::try_join_all(vec![v_jh, e_jh]).await?;
295    let is_ok = results.iter().all(|r| r.is_ok());
296    for result in results {
297        if let Err(e) = result {
298            notify_listeners(
299                &mut integrity_tx,
300                FolderIntegrityEvent::Failure(
301                    folder_id,
302                    IntegrityFailure::Error(e),
303                ),
304            )
305            .await;
306        }
307    }
308
309    if is_ok {
310        notify_listeners(
311            &mut integrity_tx,
312            FolderIntegrityEvent::CloseFolder(folder_id),
313        )
314        .await;
315    }
316
317    Ok(())
318}
319
320async fn notify_listeners(
321    tx: &mut Sender<FolderIntegrityEvent>,
322    event: FolderIntegrityEvent,
323) {
324    if let Err(error) = tx.send(event).await {
325        tracing::warn!(error = ?error.0, "account_integrity::send");
326    }
327}