lb_rs/service/
integrity.rs1use std::num::NonZeroUsize;
2use std::thread;
3
4use futures::{StreamExt, stream};
5
6use crate::model::file_metadata::Owner;
7use crate::model::filename::DocumentType;
8use crate::model::tree_like::TreeLike;
9
10use crate::Lb;
11use crate::model::errors::{LbErrKind, LbResult, Warning};
12
13impl Lb {
14 #[instrument(level = "debug", skip(self), err(Debug))]
15 pub async fn test_repo_integrity(&self) -> LbResult<Vec<Warning>> {
16 let tx = self.ro_tx().await;
17 let db = tx.db();
18
19 let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
20
21 if db.last_synced.get().unwrap_or(&0) != &0 && db.root.get().is_none() {
22 return Err(LbErrKind::RootNonexistent)?;
23 }
24
25 tree.validate(Owner(self.keychain.get_pk()?))?;
26
27 for id in tree.ids() {
28 let name = tree.name(&id, &self.keychain)?;
29 if name.is_empty() {
30 return Err(LbErrKind::FileNameEmpty)?; }
32 if name.contains('/') {
33 return Err(LbErrKind::FileNameContainsSlash)?; }
35 }
36
37 drop(tx);
38
39 let mut warnings = Vec::new();
40 let mut tasks = vec![];
41 for file in self.list_metadatas().await? {
42 if file.is_document() {
43 let is_text =
44 DocumentType::from_file_name_using_extension(&file.name) == DocumentType::Text;
45
46 if is_text {
47 tasks.push(async move { (file.id, self.read_document(file.id, false).await) });
48 }
49 }
50 }
51
52 let mut results = stream::iter(tasks).buffer_unordered(
53 thread::available_parallelism()
54 .unwrap_or(NonZeroUsize::new(4).unwrap())
55 .into(),
56 );
57
58 while let Some((id, res)) = results.next().await {
59 let doc = res?;
60 if doc.is_empty() {
61 warnings.push(Warning::EmptyFile(id));
62 continue;
63 }
64
65 if String::from_utf8(doc).is_err() {
66 warnings.push(Warning::InvalidUTF8(id));
67 continue;
68 }
69 }
70
71 Ok(warnings)
72 }
73}