aruna_file/transformers/
tar.rs

1use crate::notifications::Message;
2use crate::notifications::Response;
3use crate::transformer::FileContext;
4use crate::transformer::Transformer;
5use crate::transformer::TransformerType;
6use anyhow::bail;
7use anyhow::Result;
8use bytes::BufMut;
9use std::time::Duration;
10use std::time::SystemTime;
11use tar::Header;
12use tracing::debug;
13use tracing::error;
14
15pub struct TarEnc {
16    header: Option<Header>,
17    padding: usize,
18    finished: bool,
19    init: bool,
20}
21
22impl TryFrom<FileContext> for Header {
23    type Error = anyhow::Error;
24
25    #[tracing::instrument(level = "trace", skip(value))]
26    fn try_from(value: FileContext) -> Result<Self> {
27        let mut header = Header::new_gnu();
28
29        let path = match value.file_path {
30            Some(p) => p + &value.file_name,
31            None => value.file_name,
32        };
33        header.set_path(path)?;
34        header.set_mode(value.mode.unwrap_or(0o644));
35        header.set_mtime(value.mtime.unwrap_or_else(|| {
36            SystemTime::now()
37                .duration_since(SystemTime::UNIX_EPOCH)
38                .unwrap_or_else(|_| Duration::from_secs(0))
39                .as_secs()
40        }));
41        header.set_uid(value.gid.unwrap_or(1000));
42        header.set_gid(value.gid.unwrap_or(1000));
43        header.set_size(value.file_size);
44        header.set_cksum();
45        Ok(header)
46    }
47}
48
49impl TarEnc {
50    #[tracing::instrument(level = "trace", skip())]
51    pub fn new() -> TarEnc {
52        TarEnc {
53            header: None,
54            padding: 0,
55            finished: false,
56            init: true,
57        }
58    }
59}
60
61impl Default for TarEnc {
62    #[tracing::instrument(level = "trace", skip())]
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68#[async_trait::async_trait]
69impl Transformer for TarEnc {
70    #[tracing::instrument(level = "trace", skip(self, buf, finished, should_flush))]
71    async fn process_bytes(
72        &mut self,
73        buf: &mut bytes::BytesMut,
74        finished: bool,
75        should_flush: bool,
76    ) -> Result<bool> {
77        if should_flush {
78            if self.padding > 0 {
79                buf.put(vec![0u8; self.padding].as_ref());
80            }
81            self.padding = 0;
82            return Ok(finished);
83        }
84        if let Some(header) = &self.header {
85            let temp = buf.split();
86            if self.init {
87                self.init = false;
88            }
89            buf.put(header.as_bytes().as_slice());
90            buf.put(temp);
91            self.header = None;
92        }
93
94        if finished && !self.finished {
95            buf.put(vec![0u8; self.padding].as_ref());
96            buf.put([0u8; 1024].as_slice());
97            self.finished = true;
98        }
99        Ok(self.finished)
100    }
101
102    #[tracing::instrument(level = "trace", skip(self))]
103    fn get_type(&self) -> TransformerType {
104        TransformerType::TarEncoder
105    }
106
107    #[tracing::instrument(level = "trace", skip(self, message))]
108    async fn notify(&mut self, message: &Message) -> Result<Response> {
109        if message.target == TransformerType::All {
110            if let crate::notifications::MessageData::NextFile(nfile) = &message.data {
111                debug!("received next file message");
112                if self.header.is_none() {
113                    if nfile.context.is_dir || nfile.context.is_symlink {
114                        self.padding = 0;
115                    } else {
116                        self.padding = 512 - nfile.context.file_size as usize % 512;
117                    }
118                    self.header = Some(TryInto::<Header>::try_into(nfile.context.clone())?);
119                } else {
120                    error!("A Header is still present");
121                    bail!("[TAR] A Header is still present")
122                }
123                self.finished = false;
124            }
125        }
126
127        Ok(Response::Ok)
128    }
129}