sos_integrity/
file_integrity.rs1use crate::IntegrityFailure;
3use crate::Result;
4use futures::StreamExt;
5use indexmap::IndexSet;
6use sha2::{Digest, Sha256};
7use sos_backend::BackendTarget;
8use sos_core::{commit::CommitHash, ExternalFile};
9use sos_vfs as vfs;
10use std::{path::PathBuf, sync::Arc};
11use tokio::sync::{
12 mpsc::{self, Receiver, Sender},
13 watch, Mutex, Semaphore,
14};
15use tokio_util::io::ReaderStream;
16
17#[derive(Debug)]
19pub enum FileIntegrityEvent {
20 Begin(usize),
22 Failure(ExternalFile, IntegrityFailure),
24 OpenFile(ExternalFile, u64),
26 ReadFile(ExternalFile, usize),
28 CloseFile(ExternalFile),
35 Complete,
37}
38
39pub async fn file_integrity(
43 target: &BackendTarget,
44 external_files: IndexSet<ExternalFile>,
45 concurrency: usize,
46) -> Result<(Receiver<FileIntegrityEvent>, watch::Sender<bool>)> {
47 let paths = target.paths();
48
49 let (mut event_tx, event_rx) = mpsc::channel::<FileIntegrityEvent>(64);
50 let (cancel_tx, mut cancel_rx) = watch::channel(false);
51
52 notify_listeners(
53 &mut event_tx,
54 FileIntegrityEvent::Begin(external_files.len()),
55 )
56 .await;
57
58 let paths: Vec<_> = external_files
59 .into_iter()
60 .map(|file| (file, paths.into_file_path(&file)))
61 .collect();
62 let num_files = paths.len();
63 let semaphore = Arc::new(Semaphore::new(concurrency));
64 let cancel = cancel_tx.clone();
65 tokio::task::spawn(async move {
66 let mut stream = futures::stream::iter(paths);
67 let completed = Arc::new(Mutex::new(0));
68 loop {
69 tokio::select! {
70 biased;
71 _ = cancel_rx.changed() => {
72 break;
73 }
74 Some((file, path)) = stream.next() => {
75 let semaphore = semaphore.clone();
76 let cancel_tx = cancel.clone();
77 let mut cancel_rx = cancel_rx.clone();
78 let mut event_tx = event_tx.clone();
79 let completed = completed.clone();
80 tokio::task::spawn(async move {
81 let _permit = semaphore.acquire().await;
82 check_file(file, path, &mut event_tx, &mut cancel_rx).await?;
83 let mut writer = completed.lock().await;
84 *writer += 1;
85 if *writer == num_files {
86 if let Err(error) = cancel_tx.send(true) {
90 tracing::error!(error = ?error);
91 }
92 }
93 Ok::<_, crate::Error>(())
94 });
95 }
96 }
97 }
98
99 notify_listeners(&mut event_tx, FileIntegrityEvent::Complete).await;
100
101 Ok::<_, crate::Error>(())
102 });
103
104 Ok((event_rx, cancel_tx))
105}
106
107async fn check_file(
108 file: ExternalFile,
109 path: PathBuf,
110 tx: &mut Sender<FileIntegrityEvent>,
111 cancel_rx: &mut watch::Receiver<bool>,
112) -> Result<()> {
113 if vfs::try_exists(&path).await? {
114 let metadata = vfs::metadata(&path).await?;
115 notify_listeners(
116 tx,
117 FileIntegrityEvent::OpenFile(file, metadata.len()),
118 )
119 .await;
120
121 match compare_file(&file, path, tx, cancel_rx).await {
122 Ok(result) => {
123 if let Some(failure) = result {
124 notify_listeners(
125 tx,
126 FileIntegrityEvent::Failure(file, failure),
127 )
128 .await;
129 }
130 notify_listeners(tx, FileIntegrityEvent::CloseFile(file))
131 .await;
132 }
133 Err(e) => {
134 notify_listeners(
135 tx,
136 FileIntegrityEvent::Failure(
137 file,
138 IntegrityFailure::Error(e),
139 ),
140 )
141 .await;
142 }
143 }
144 } else {
145 notify_listeners(
146 tx,
147 FileIntegrityEvent::Failure(
148 file,
149 IntegrityFailure::MissingFile(file),
150 ),
151 )
152 .await;
153 }
154 Ok(())
155}
156
157async fn compare_file(
158 external_file: &ExternalFile,
159 path: PathBuf,
160 tx: &mut Sender<FileIntegrityEvent>,
161 cancel_rx: &mut watch::Receiver<bool>,
162) -> Result<Option<IntegrityFailure>> {
163 let mut hasher = Sha256::new();
164
165 let file = vfs::File::open(&path).await?;
166 let metadata = vfs::metadata(&path).await?;
167 let bytes_total = metadata.len();
168 let mut bytes_read = 0;
169 let mut reader_stream = ReaderStream::new(file);
170 loop {
171 tokio::select! {
172 biased;
173 _ = cancel_rx.changed() => {
174 break;
175 }
176 chunk = reader_stream.next() => {
177 if let Some(chunk) = chunk {
178 let chunk = chunk?;
179 hasher.update(&chunk);
180 bytes_read += chunk.len();
181 notify_listeners(
182 tx,
183 FileIntegrityEvent::ReadFile(*external_file, chunk.len()),
184 )
185 .await;
186 } else {
187 break;
188 }
189 }
190 }
191 }
192
193 let digest = hasher.finalize();
194
195 let is_completed = bytes_read as u64 == bytes_total;
196
197 if is_completed && digest.as_slice() != external_file.file_name().as_ref()
201 {
202 let slice: [u8; 32] = digest.as_slice().try_into()?;
203 Ok(Some(IntegrityFailure::CorruptedFile {
204 external_file: *external_file,
205 expected: external_file.file_name().into(),
206 actual: CommitHash(slice),
207 }))
208 } else {
209 Ok(None)
210 }
211}
212
213async fn notify_listeners(
214 tx: &mut Sender<FileIntegrityEvent>,
215 event: FileIntegrityEvent,
216) {
217 if let Err(error) = tx.send(event).await {
218 tracing::warn!(error = ?error.0, "file_integrity::send");
219 }
220}