streamdigest 0.1.0

Async file hashing and digest calculation with progress reporting
Documentation
// crates/core/synergy-sys/src/digest/hasher.rs
use anyhow::{Context, Result};
use std::{
    io::Read,
    path::Path,
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
    time::{Duration, Instant},
};
use tokio::{io::AsyncReadExt, sync::mpsc};
use tokio_util::sync::CancellationToken;

#[derive(Debug)]
pub struct HashingStats {
    pub elapsed: Duration,
    pub bytes_read: u64,
}

impl HashingStats {
    pub fn total_read_mb(&self) -> f64 {
        self.bytes_read as f64 / 1_000_000.0
    }
    pub fn bytes_per_sec(&self) -> f64 {
        self.bytes_read as f64 / self.elapsed.as_secs_f64()
    }
    pub fn mb_per_sec(&self) -> f64 {
        self.bytes_per_sec() / 1_000_000.0
    }
}

#[derive(Clone)]
pub struct Hasher {
    read_buffer_size: usize,
    channel_buffer_size: usize,
    stats_callback_fn: Option<Arc<dyn Fn(HashingStats) + Send + Sync>>,
    use_o_direct: bool,
    throughput_update_interval: Duration,
}

const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024 * 8; // 8MB
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1;
const THROUGHPUT_UPDATE_INTERVAL: Duration = Duration::from_secs(1);

impl Hasher {
    /// Static calculate SHA256 hash for a byte-like slice
    pub fn sha256sum(data: impl AsRef<[u8]>) -> String {
        hex::encode(ring::digest::digest(&ring::digest::SHA256, data.as_ref()))
    }

    pub fn sha512sum(data: impl AsRef<[u8]>) -> String {
        hex::encode(ring::digest::digest(&ring::digest::SHA512, data.as_ref()))
    }

    pub fn sha1sum(data: impl AsRef<[u8]>) -> String {
        hex::encode(ring::digest::digest(
            &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
            data.as_ref(),
        ))
    }
}

impl Hasher {
    pub fn new() -> Self {
        Hasher {
            read_buffer_size: DEFAULT_BUFFER_SIZE,
            channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
            throughput_update_interval: THROUGHPUT_UPDATE_INTERVAL,
            stats_callback_fn: None,
            use_o_direct: false,
        }
    }

    pub fn use_o_direct(mut self) -> Self {
        self.use_o_direct = true;
        self
    }

    pub fn with_read_buffer_size(mut self, size: usize) -> Self {
        self.read_buffer_size = size;
        self
    }

    pub fn with_channel_buffer_size(mut self, size: usize) -> Self {
        self.channel_buffer_size = size;
        self
    }

    pub fn with_throughput_update_interval(mut self, interval: Duration) -> Self {
        self.throughput_update_interval = interval;
        self
    }

    pub fn with_stats_callback<F: Fn(HashingStats) + Send + Sync + 'static>(
        mut self,
        f: F,
    ) -> Self {
        self.stats_callback_fn = Some(Arc::new(f));
        self
    }
}

impl Hasher {
    /// Synchronously calculate SHA256 hash for any type implementing Read
    pub fn sha256_reader<R: Read>(&self, mut reader: R) -> Result<String> {
        let mut context = ring::digest::Context::new(&ring::digest::SHA256);
        let mut buffer = vec![0; self.read_buffer_size];

        loop {
            let bytes_read = reader.read(&mut buffer)?;
            if bytes_read == 0 {
                break;
            }
            context.update(&buffer[..bytes_read]);
        }

        let digest = context.finish();
        Ok(hex::encode(digest))
    }

    /// Asynchronously calculate SHA256 hash for a file
    pub async fn sha256_file<P: AsRef<Path>>(&self, path: P) -> Result<String> {
        self.digest_file(path, &ring::digest::SHA256).await
    }

    /// Asynchronously calculate SHA512 hash for a file
    pub async fn sha512_file<P: AsRef<Path>>(&self, path: P) -> Result<String> {
        self.digest_file(path, &ring::digest::SHA512).await
    }

    async fn digest_file<P: AsRef<Path>>(
        &self,
        path: P,
        algorithm: &'static ring::digest::Algorithm,
    ) -> Result<String> {
        let path = path.as_ref().to_owned();
        let use_o_direct = self.use_o_direct;

        let mut open_opt = tokio::fs::OpenOptions::new();
        open_opt.read(true);

        #[cfg(unix)]
        {
            if use_o_direct {
                // prevents linux page cache from filling up
                open_opt.custom_flags(libc::O_DIRECT);
            }
        }

        let file = open_opt.open(&path).await?;
        self.digest_stream(algorithm, file).await
    }

    /// Asynchronously calculate SHA256 hash for a stream
    pub async fn sha256_stream<S>(&self, stream: S) -> Result<String>
    where
        S: AsyncReadExt + Unpin + Send + 'static,
    {
        self.digest_stream(&ring::digest::SHA256, stream).await
    }

    async fn digest_stream<S>(
        &self,
        algorithm: &'static ring::digest::Algorithm,
        mut stream: S,
    ) -> Result<String>
    where
        S: AsyncReadExt + Unpin + Send + 'static,
    {
        let (buffer_tx, mut buffer_rx) = mpsc::channel(self.channel_buffer_size);
        let cancel_token = CancellationToken::new();
        let total_bytes_read = Arc::new(AtomicU64::new(0));
        let throughput_update_interval = self.throughput_update_interval;

        // Optionally spawn a task for monitoring and reporting throughput
        let throughput_task = self.stats_callback_fn.as_ref().map(|stats_callback| {
            tokio::spawn({
                let start_time = Instant::now();
                let total_bytes_read = Arc::clone(&total_bytes_read);
                let cancel_token = cancel_token.clone();
                let stats_callback = stats_callback.clone();
                async move {
                    let mut interval = tokio::time::interval(throughput_update_interval);
                    loop {
                        tokio::select! {
                            _ = interval.tick() => stats_callback(HashingStats {
                                elapsed: start_time.elapsed(),
                                bytes_read: total_bytes_read.load(Ordering::Relaxed)
                            }),

                            _ = cancel_token.cancelled() => break,
                        }
                    }
                }
            })
        });

        let mut buffer = Vec::with_capacity(self.read_buffer_size);
        // Spawn a task for reading from the stream source
        let read_task = tokio::spawn(async move {
            loop {
                buffer.clear();
                let bytes_read = stream.read_buf(&mut buffer).await?;
                if bytes_read == 0 {
                    break;
                }

                total_bytes_read.fetch_add(bytes_read as u64, Ordering::Relaxed);

                buffer_tx
                    .send(buffer[..bytes_read].to_vec())
                    .await
                    .context("Failed to send buffer through channel")?;
            }

            anyhow::Ok(())
        });

        // Spawn a blocking task for hashing
        let hash_task = tokio::task::spawn(async move {
            let mut context = ring::digest::Context::new(algorithm);
            while let Some(buffer) = buffer_rx.recv().await {
                context.update(&buffer);
            }
            anyhow::Ok(context.finish())
        });

        // Wait for stream reading and hashing to complete
        let (read_result, hash_result) = tokio::join!(read_task, hash_task);

        // Signal cancellation to the throughput logging task
        cancel_token.cancel();

        // Wait for throughput logging to complete if enabled
        if let Some(handle) = throughput_task {
            handle.await.context("Throughput logging task panicked")?;
        }

        read_result.context("Stream reading task panicked")??;
        let digest = hash_result.context("Hashing task panicked")??;

        Ok(hex::encode(digest))
    }
}

impl Default for Hasher {
    fn default() -> Self {
        Self::new()
    }
}