aruna_file/transformers/
tar.rs1use crate::notifications::Message;
2use crate::notifications::Response;
3use crate::transformer::FileContext;
4use crate::transformer::Transformer;
5use crate::transformer::TransformerType;
6use anyhow::bail;
7use anyhow::Result;
8use bytes::BufMut;
9use std::time::Duration;
10use std::time::SystemTime;
11use tar::Header;
12use tracing::debug;
13use tracing::error;
14
15pub struct TarEnc {
16 header: Option<Header>,
17 padding: usize,
18 finished: bool,
19 init: bool,
20}
21
22impl TryFrom<FileContext> for Header {
23 type Error = anyhow::Error;
24
25 #[tracing::instrument(level = "trace", skip(value))]
26 fn try_from(value: FileContext) -> Result<Self> {
27 let mut header = Header::new_gnu();
28
29 let path = match value.file_path {
30 Some(p) => p + &value.file_name,
31 None => value.file_name,
32 };
33 header.set_path(path)?;
34 header.set_mode(value.mode.unwrap_or(0o644));
35 header.set_mtime(value.mtime.unwrap_or_else(|| {
36 SystemTime::now()
37 .duration_since(SystemTime::UNIX_EPOCH)
38 .unwrap_or_else(|_| Duration::from_secs(0))
39 .as_secs()
40 }));
41 header.set_uid(value.gid.unwrap_or(1000));
42 header.set_gid(value.gid.unwrap_or(1000));
43 header.set_size(value.file_size);
44 header.set_cksum();
45 Ok(header)
46 }
47}
48
49impl TarEnc {
50 #[tracing::instrument(level = "trace", skip())]
51 pub fn new() -> TarEnc {
52 TarEnc {
53 header: None,
54 padding: 0,
55 finished: false,
56 init: true,
57 }
58 }
59}
60
61impl Default for TarEnc {
62 #[tracing::instrument(level = "trace", skip())]
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68#[async_trait::async_trait]
69impl Transformer for TarEnc {
70 #[tracing::instrument(level = "trace", skip(self, buf, finished, should_flush))]
71 async fn process_bytes(
72 &mut self,
73 buf: &mut bytes::BytesMut,
74 finished: bool,
75 should_flush: bool,
76 ) -> Result<bool> {
77 if should_flush {
78 if self.padding > 0 {
79 buf.put(vec![0u8; self.padding].as_ref());
80 }
81 self.padding = 0;
82 return Ok(finished);
83 }
84 if let Some(header) = &self.header {
85 let temp = buf.split();
86 if self.init {
87 self.init = false;
88 }
89 buf.put(header.as_bytes().as_slice());
90 buf.put(temp);
91 self.header = None;
92 }
93
94 if finished && !self.finished {
95 buf.put(vec![0u8; self.padding].as_ref());
96 buf.put([0u8; 1024].as_slice());
97 self.finished = true;
98 }
99 Ok(self.finished)
100 }
101
102 #[tracing::instrument(level = "trace", skip(self))]
103 fn get_type(&self) -> TransformerType {
104 TransformerType::TarEncoder
105 }
106
107 #[tracing::instrument(level = "trace", skip(self, message))]
108 async fn notify(&mut self, message: &Message) -> Result<Response> {
109 if message.target == TransformerType::All {
110 if let crate::notifications::MessageData::NextFile(nfile) = &message.data {
111 debug!("received next file message");
112 if self.header.is_none() {
113 if nfile.context.is_dir || nfile.context.is_symlink {
114 self.padding = 0;
115 } else {
116 self.padding = 512 - nfile.context.file_size as usize % 512;
117 }
118 self.header = Some(TryInto::<Header>::try_into(nfile.context.clone())?);
119 } else {
120 error!("A Header is still present");
121 bail!("[TAR] A Header is still present")
122 }
123 self.finished = false;
124 }
125 }
126
127 Ok(Response::Ok)
128 }
129}