use anyhow::{Context, Result};
use std::{
io::Read,
path::Path,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use tokio::{io::AsyncReadExt, sync::mpsc};
use tokio_util::sync::CancellationToken;
#[derive(Debug)]
pub struct HashingStats {
pub elapsed: Duration,
pub bytes_read: u64,
}
impl HashingStats {
pub fn total_read_mb(&self) -> f64 {
self.bytes_read as f64 / 1_000_000.0
}
pub fn bytes_per_sec(&self) -> f64 {
self.bytes_read as f64 / self.elapsed.as_secs_f64()
}
pub fn mb_per_sec(&self) -> f64 {
self.bytes_per_sec() / 1_000_000.0
}
}
#[derive(Clone)]
pub struct Hasher {
read_buffer_size: usize,
channel_buffer_size: usize,
stats_callback_fn: Option<Arc<dyn Fn(HashingStats) + Send + Sync>>,
use_o_direct: bool,
throughput_update_interval: Duration,
}
const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024 * 8; const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1;
const THROUGHPUT_UPDATE_INTERVAL: Duration = Duration::from_secs(1);
impl Hasher {
pub fn sha256sum(data: impl AsRef<[u8]>) -> String {
hex::encode(ring::digest::digest(&ring::digest::SHA256, data.as_ref()))
}
pub fn sha512sum(data: impl AsRef<[u8]>) -> String {
hex::encode(ring::digest::digest(&ring::digest::SHA512, data.as_ref()))
}
pub fn sha1sum(data: impl AsRef<[u8]>) -> String {
hex::encode(ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
data.as_ref(),
))
}
}
impl Hasher {
pub fn new() -> Self {
Hasher {
read_buffer_size: DEFAULT_BUFFER_SIZE,
channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
throughput_update_interval: THROUGHPUT_UPDATE_INTERVAL,
stats_callback_fn: None,
use_o_direct: false,
}
}
pub fn use_o_direct(mut self) -> Self {
self.use_o_direct = true;
self
}
pub fn with_read_buffer_size(mut self, size: usize) -> Self {
self.read_buffer_size = size;
self
}
pub fn with_channel_buffer_size(mut self, size: usize) -> Self {
self.channel_buffer_size = size;
self
}
pub fn with_throughput_update_interval(mut self, interval: Duration) -> Self {
self.throughput_update_interval = interval;
self
}
pub fn with_stats_callback<F: Fn(HashingStats) + Send + Sync + 'static>(
mut self,
f: F,
) -> Self {
self.stats_callback_fn = Some(Arc::new(f));
self
}
}
impl Hasher {
pub fn sha256_reader<R: Read>(&self, mut reader: R) -> Result<String> {
let mut context = ring::digest::Context::new(&ring::digest::SHA256);
let mut buffer = vec![0; self.read_buffer_size];
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
context.update(&buffer[..bytes_read]);
}
let digest = context.finish();
Ok(hex::encode(digest))
}
pub async fn sha256_file<P: AsRef<Path>>(&self, path: P) -> Result<String> {
self.digest_file(path, &ring::digest::SHA256).await
}
pub async fn sha512_file<P: AsRef<Path>>(&self, path: P) -> Result<String> {
self.digest_file(path, &ring::digest::SHA512).await
}
async fn digest_file<P: AsRef<Path>>(
&self,
path: P,
algorithm: &'static ring::digest::Algorithm,
) -> Result<String> {
let path = path.as_ref().to_owned();
let use_o_direct = self.use_o_direct;
let mut open_opt = tokio::fs::OpenOptions::new();
open_opt.read(true);
#[cfg(unix)]
{
if use_o_direct {
open_opt.custom_flags(libc::O_DIRECT);
}
}
let file = open_opt.open(&path).await?;
self.digest_stream(algorithm, file).await
}
pub async fn sha256_stream<S>(&self, stream: S) -> Result<String>
where
S: AsyncReadExt + Unpin + Send + 'static,
{
self.digest_stream(&ring::digest::SHA256, stream).await
}
async fn digest_stream<S>(
&self,
algorithm: &'static ring::digest::Algorithm,
mut stream: S,
) -> Result<String>
where
S: AsyncReadExt + Unpin + Send + 'static,
{
let (buffer_tx, mut buffer_rx) = mpsc::channel(self.channel_buffer_size);
let cancel_token = CancellationToken::new();
let total_bytes_read = Arc::new(AtomicU64::new(0));
let throughput_update_interval = self.throughput_update_interval;
let throughput_task = self.stats_callback_fn.as_ref().map(|stats_callback| {
tokio::spawn({
let start_time = Instant::now();
let total_bytes_read = Arc::clone(&total_bytes_read);
let cancel_token = cancel_token.clone();
let stats_callback = stats_callback.clone();
async move {
let mut interval = tokio::time::interval(throughput_update_interval);
loop {
tokio::select! {
_ = interval.tick() => stats_callback(HashingStats {
elapsed: start_time.elapsed(),
bytes_read: total_bytes_read.load(Ordering::Relaxed)
}),
_ = cancel_token.cancelled() => break,
}
}
}
})
});
let mut buffer = Vec::with_capacity(self.read_buffer_size);
let read_task = tokio::spawn(async move {
loop {
buffer.clear();
let bytes_read = stream.read_buf(&mut buffer).await?;
if bytes_read == 0 {
break;
}
total_bytes_read.fetch_add(bytes_read as u64, Ordering::Relaxed);
buffer_tx
.send(buffer[..bytes_read].to_vec())
.await
.context("Failed to send buffer through channel")?;
}
anyhow::Ok(())
});
let hash_task = tokio::task::spawn(async move {
let mut context = ring::digest::Context::new(algorithm);
while let Some(buffer) = buffer_rx.recv().await {
context.update(&buffer);
}
anyhow::Ok(context.finish())
});
let (read_result, hash_result) = tokio::join!(read_task, hash_task);
cancel_token.cancel();
if let Some(handle) = throughput_task {
handle.await.context("Throughput logging task panicked")?;
}
read_result.context("Stream reading task panicked")??;
let digest = hash_result.context("Hashing task panicked")??;
Ok(hex::encode(digest))
}
}
impl Default for Hasher {
fn default() -> Self {
Self::new()
}
}