aruna_file/transformers/
hashing_transformer.rs1use crate::transformer::{Transformer, TransformerType};
2use anyhow::anyhow;
3use anyhow::Result;
4use async_channel::{Receiver, Sender};
5use digest::{Digest, FixedOutputReset};
6use tracing::error;
7
8pub struct HashingTransformer<T: Digest + Send + FixedOutputReset> {
9 hasher: T,
10 sender: Sender<String>,
11}
12
13impl<T> HashingTransformer<T>
14where
15 T: Digest + Send + Sync + FixedOutputReset,
16{
17 #[tracing::instrument(level = "trace", skip(hasher))]
18 #[allow(dead_code)]
19 pub fn new(hasher: T) -> (HashingTransformer<T>, Receiver<String>) {
20 let (sender, receiver) = async_channel::bounded(1);
21 (HashingTransformer { hasher, sender }, receiver)
22 }
23}
24
25#[async_trait::async_trait]
26impl<T> Transformer for HashingTransformer<T>
27where
28 T: Digest + Send + Sync + FixedOutputReset,
29{
30 #[tracing::instrument(level = "trace", skip(self, buf, finished))]
31 async fn process_bytes(
32 &mut self,
33 buf: &mut bytes::BytesMut,
34 finished: bool,
35 _: bool,
36 ) -> Result<bool> {
37 Digest::update(&mut self.hasher, &buf);
38
39 if finished && buf.is_empty() {
40 match self
41 .sender
42 .try_send(hex::encode(self.hasher.finalize_reset()).to_string())
43 {
44 Ok(_) => {}
45 Err(e) => match e {
46 async_channel::TrySendError::Full(_) => {}
47 async_channel::TrySendError::Closed(_) => {
48 error!("Sending in closed channel");
49 return Err(anyhow!("HashingTransformer: Channel closed"));
50 }
51 },
52 }
53 }
54 Ok(true)
55 }
56
57 #[tracing::instrument(level = "trace", skip(self))]
58 fn get_type(&self) -> TransformerType {
59 TransformerType::Hashing
60 }
61}