1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/// Currently uninplemented
// use std::mem;
// use crate::notifications::Message;
// use crate::notifications::Response;
// use crate::transformer::FileContext;
// use crate::transformer::Transformer;
// use crate::transformer::TransformerType;
// use anyhow::bail;
// use anyhow::Result;
// use async_zip::base::write::EntryStreamWriter;
// use async_zip::base::write::ZipFileWriter;
// use async_zip::Compression;
// use async_zip::ZipEntry;
// use async_zip::ZipEntryBuilder;
// use bytes::BufMut;
// use futures::AsyncWriteExt;
// pub struct ZipEnc<'a> {
// writer: &'a ZipFileWriter<Vec<u8>>,
// current_file: Option<EntryStreamWriter<'a, Vec<u8>>>,
// finished: bool,
// }
// impl From<FileContext> for ZipEntry {
// #[tracing::instrument(level = "trace", skip(ctx))]
// fn from(ctx: FileContext) -> Self {
// ZipEntryBuilder::new(ctx.file_name.into(), Compression::Deflate).build()
// }
// }
// impl<'a> ZipEnc<'a> {
// #[tracing::instrument(level = "trace", skip(writer))]
// pub fn new(writer: &'a ZipFileWriter<Vec<u8>>) -> ZipEnc<'a> {
// ZipEnc {
// writer,
// current_file: None,
// finished: false,
// }
// }
// }
// #[async_trait::async_trait]
// impl<'a> Transformer for ZipEnc<'a> {
// #[tracing::instrument(level = "trace", skip(self, buf, finished, should_flush))]
// async fn process_bytes(
// &mut self,
// buf: &mut bytes::BytesMut,
// finished: bool,
// should_flush: bool,
// ) -> Result<bool> {
// if let Some(current_file) = &mut self.current_file {
// current_file.write_all(buf).await?;
// if finished && buf.is_empty() {
// current_file.close().await?;
// let data = self.writer.close().await?;
// buf.put(self.writer.close().await?.as_ref());
// }
// return Ok(finished);
// } else {
// return Err(anyhow::anyhow!("No current file"));
// }
// }
// #[tracing::instrument(level = "trace", skip(self))]
// fn get_type(&self) -> TransformerType {
// TransformerType::ZipEncoder
// }
// #[tracing::instrument(level = "trace", skip(self, message))]
// async fn notify(&mut self, message: &Message) -> Result<Response> {
// if message.target == TransformerType::All {
// if let crate::notifications::MessageData::NextFile(nfile) = &message.data {
// if self.current_file.is_none() {
// let entry = ZipEntry::from(nfile.context.clone());
// let new_stream = self.writer.write_entry_stream(entry).await?;
// mem::replace(&mut self.current_file, Some(new_stream));
// } else {
// error!("a header is still present");
// bail!("[TAR] A Header is still present")
// }
// self.finished = false;
// }
// }
// Ok(Response::Ok)
// }
// }