sos_integrity/
file_integrity.rs

1//! Check integrity of external files.
2use crate::IntegrityFailure;
3use crate::Result;
4use futures::StreamExt;
5use indexmap::IndexSet;
6use sha2::{Digest, Sha256};
7use sos_backend::BackendTarget;
8use sos_core::{commit::CommitHash, ExternalFile};
9use sos_vfs as vfs;
10use std::{path::PathBuf, sync::Arc};
11use tokio::sync::{
12    mpsc::{self, Receiver, Sender},
13    watch, Mutex, Semaphore,
14};
15use tokio_util::io::ReaderStream;
16
17/// Event dispatched whilst generating an integrity report.
18#[derive(Debug)]
19pub enum FileIntegrityEvent {
20    /// Begin processing the given number of files.
21    Begin(usize),
22    /// Integrity check failed.
23    Failure(ExternalFile, IntegrityFailure),
24    /// File was opened.
25    OpenFile(ExternalFile, u64),
26    /// Read file buffer.
27    ReadFile(ExternalFile, usize),
28    /// File was closed.
29    ///
30    /// This event is only sent when a file integrity
31    /// check completes successfully.
32    ///
33    /// Errors are reported as a failure event.
34    CloseFile(ExternalFile),
35    /// File integrity check completed.
36    Complete,
37}
38
39/// Iterate a collection of external files and verify the integrity
40/// by checking the files exist on disc and the checksum of the disc
41/// contents matches the expected checksum.
42pub async fn file_integrity(
43    target: &BackendTarget,
44    external_files: IndexSet<ExternalFile>,
45    concurrency: usize,
46) -> Result<(Receiver<FileIntegrityEvent>, watch::Sender<bool>)> {
47    let paths = target.paths();
48
49    let (mut event_tx, event_rx) = mpsc::channel::<FileIntegrityEvent>(64);
50    let (cancel_tx, mut cancel_rx) = watch::channel(false);
51
52    notify_listeners(
53        &mut event_tx,
54        FileIntegrityEvent::Begin(external_files.len()),
55    )
56    .await;
57
58    let paths: Vec<_> = external_files
59        .into_iter()
60        .map(|file| (file, paths.into_file_path(&file)))
61        .collect();
62    let num_files = paths.len();
63    let semaphore = Arc::new(Semaphore::new(concurrency));
64    let cancel = cancel_tx.clone();
65    tokio::task::spawn(async move {
66        let mut stream = futures::stream::iter(paths);
67        let completed = Arc::new(Mutex::new(0));
68        loop {
69            tokio::select! {
70              biased;
71              _ = cancel_rx.changed() => {
72                break;
73              }
74              Some((file, path)) = stream.next() => {
75                let semaphore = semaphore.clone();
76                let cancel_tx = cancel.clone();
77                let mut cancel_rx = cancel_rx.clone();
78                let mut event_tx = event_tx.clone();
79                let completed = completed.clone();
80                tokio::task::spawn(async move {
81                  let _permit = semaphore.acquire().await;
82                  check_file(file, path, &mut event_tx, &mut cancel_rx).await?;
83                  let mut writer = completed.lock().await;
84                  *writer += 1;
85                  if *writer == num_files {
86                    // Signal the shutdown event on the cancel channel
87                    // to break out of this loop and cancel any existing
88                    // file reader streams
89                    if let Err(error) = cancel_tx.send(true) {
90                      tracing::error!(error = ?error);
91                    }
92                  }
93                  Ok::<_, crate::Error>(())
94                });
95              }
96            }
97        }
98
99        notify_listeners(&mut event_tx, FileIntegrityEvent::Complete).await;
100
101        Ok::<_, crate::Error>(())
102    });
103
104    Ok((event_rx, cancel_tx))
105}
106
107async fn check_file(
108    file: ExternalFile,
109    path: PathBuf,
110    tx: &mut Sender<FileIntegrityEvent>,
111    cancel_rx: &mut watch::Receiver<bool>,
112) -> Result<()> {
113    if vfs::try_exists(&path).await? {
114        let metadata = vfs::metadata(&path).await?;
115        notify_listeners(
116            tx,
117            FileIntegrityEvent::OpenFile(file, metadata.len()),
118        )
119        .await;
120
121        match compare_file(&file, path, tx, cancel_rx).await {
122            Ok(result) => {
123                if let Some(failure) = result {
124                    notify_listeners(
125                        tx,
126                        FileIntegrityEvent::Failure(file, failure),
127                    )
128                    .await;
129                }
130                notify_listeners(tx, FileIntegrityEvent::CloseFile(file))
131                    .await;
132            }
133            Err(e) => {
134                notify_listeners(
135                    tx,
136                    FileIntegrityEvent::Failure(
137                        file,
138                        IntegrityFailure::Error(e),
139                    ),
140                )
141                .await;
142            }
143        }
144    } else {
145        notify_listeners(
146            tx,
147            FileIntegrityEvent::Failure(
148                file,
149                IntegrityFailure::MissingFile(file),
150            ),
151        )
152        .await;
153    }
154    Ok(())
155}
156
157async fn compare_file(
158    external_file: &ExternalFile,
159    path: PathBuf,
160    tx: &mut Sender<FileIntegrityEvent>,
161    cancel_rx: &mut watch::Receiver<bool>,
162) -> Result<Option<IntegrityFailure>> {
163    let mut hasher = Sha256::new();
164
165    let file = vfs::File::open(&path).await?;
166    let metadata = vfs::metadata(&path).await?;
167    let bytes_total = metadata.len();
168    let mut bytes_read = 0;
169    let mut reader_stream = ReaderStream::new(file);
170    loop {
171        tokio::select! {
172          biased;
173          _ = cancel_rx.changed() => {
174            break;
175          }
176          chunk = reader_stream.next() => {
177            if let Some(chunk) = chunk {
178              let chunk = chunk?;
179              hasher.update(&chunk);
180              bytes_read += chunk.len();
181              notify_listeners(
182                  tx,
183                  FileIntegrityEvent::ReadFile(*external_file, chunk.len()),
184              )
185              .await;
186            } else {
187              break;
188            }
189          }
190        }
191    }
192
193    let digest = hasher.finalize();
194
195    let is_completed = bytes_read as u64 == bytes_total;
196
197    // Only check for checksum mismatch if we actually
198    // read all the bytes; if we receive a cancellation
199    // then we don't want to send an integrity failure.
200    if is_completed && digest.as_slice() != external_file.file_name().as_ref()
201    {
202        let slice: [u8; 32] = digest.as_slice().try_into()?;
203        Ok(Some(IntegrityFailure::CorruptedFile {
204            external_file: *external_file,
205            expected: external_file.file_name().into(),
206            actual: CommitHash(slice),
207        }))
208    } else {
209        Ok(None)
210    }
211}
212
213async fn notify_listeners(
214    tx: &mut Sender<FileIntegrityEvent>,
215    event: FileIntegrityEvent,
216) {
217    if let Err(error) = tx.send(event).await {
218        tracing::warn!(error = ?error.0, "file_integrity::send");
219    }
220}