aruna_file/transformers/
hashing_transformer.rs

1use crate::transformer::{Transformer, TransformerType};
2use anyhow::anyhow;
3use anyhow::Result;
4use async_channel::{Receiver, Sender};
5use digest::{Digest, FixedOutputReset};
6use tracing::error;
7
8pub struct HashingTransformer<T: Digest + Send + FixedOutputReset> {
9    hasher: T,
10    sender: Sender<String>,
11}
12
13impl<T> HashingTransformer<T>
14where
15    T: Digest + Send + Sync + FixedOutputReset,
16{
17    #[tracing::instrument(level = "trace", skip(hasher))]
18    #[allow(dead_code)]
19    pub fn new(hasher: T) -> (HashingTransformer<T>, Receiver<String>) {
20        let (sender, receiver) = async_channel::bounded(1);
21        (HashingTransformer { hasher, sender }, receiver)
22    }
23}
24
25#[async_trait::async_trait]
26impl<T> Transformer for HashingTransformer<T>
27where
28    T: Digest + Send + Sync + FixedOutputReset,
29{
30    #[tracing::instrument(level = "trace", skip(self, buf, finished))]
31    async fn process_bytes(
32        &mut self,
33        buf: &mut bytes::BytesMut,
34        finished: bool,
35        _: bool,
36    ) -> Result<bool> {
37        Digest::update(&mut self.hasher, &buf);
38
39        if finished && buf.is_empty() {
40            match self
41                .sender
42                .try_send(hex::encode(self.hasher.finalize_reset()).to_string())
43            {
44                Ok(_) => {}
45                Err(e) => match e {
46                    async_channel::TrySendError::Full(_) => {}
47                    async_channel::TrySendError::Closed(_) => {
48                        error!("Sending in closed channel");
49                        return Err(anyhow!("HashingTransformer: Channel closed"));
50                    }
51                },
52            }
53        }
54        Ok(true)
55    }
56
57    #[tracing::instrument(level = "trace", skip(self))]
58    fn get_type(&self) -> TransformerType {
59        TransformerType::Hashing
60    }
61}