lb_rs/service/
integrity.rs1use std::num::NonZeroUsize;
2use std::thread;
3
4use futures::{stream, StreamExt};
5
6use crate::model::file_metadata::Owner;
7use crate::model::filename::DocumentType;
8use crate::model::tree_like::TreeLike;
9
10use crate::model::errors::{LbErrKind, LbResult, Warning};
11use crate::Lb;
12
13impl Lb {
14 #[instrument(level = "debug", skip(self), err(Debug))]
16 pub async fn test_repo_integrity(&self) -> LbResult<Vec<Warning>> {
17 let tx = self.ro_tx().await;
18 let db = tx.db();
19
20 let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
21
22 if db.last_synced.get().unwrap_or(&0) != &0 && db.root.get().is_none() {
23 return Err(LbErrKind::RootNonexistent)?;
24 }
25
26 tree.validate(Owner(self.keychain.get_pk()?))?;
27
28 for id in tree.ids() {
29 let name = tree.name(&id, &self.keychain)?;
30 if name.is_empty() {
31 return Err(LbErrKind::FileNameEmpty)?; }
33 if name.contains('/') {
34 return Err(LbErrKind::FileNameContainsSlash)?; }
36 }
37
38 drop(tree);
39
40 let mut warnings = Vec::new();
41 let mut tasks = vec![];
42 for file in self.list_metadatas().await? {
43 if file.is_document() {
44 let is_text =
45 DocumentType::from_file_name_using_extension(&file.name) == DocumentType::Text;
46
47 if is_text {
48 tasks.push(async move { (file.id, self.read_document(file.id, false).await) });
49 }
50 }
51 }
52
53 let mut results = stream::iter(tasks).buffer_unordered(
54 thread::available_parallelism()
55 .unwrap_or(NonZeroUsize::new(4).unwrap())
56 .into(),
57 );
58
59 while let Some((id, res)) = results.next().await {
60 let doc = res?;
61 if doc.is_empty() {
62 warnings.push(Warning::EmptyFile(id));
63 continue;
64 }
65
66 if String::from_utf8(doc).is_err() {
67 warnings.push(Warning::InvalidUTF8(id));
68 continue;
69 }
70 }
71
72 Ok(warnings)
73 }
74}