1use crate::notifications::{FileMessage, Message};
2use crate::transformer::{FileContext, ReadWriter, Sink, Transformer, TransformerType};
3use crate::transformers::writer_sink::WriterSink;
4use anyhow::{bail, Result};
5use async_channel::{Receiver, Sender};
6use bytes::{BufMut, Bytes, BytesMut};
7use futures::{Stream, StreamExt};
8use std::mem;
9use tokio::io::{AsyncWrite, BufWriter};
10use tracing::{debug, error};
11
12pub struct ArunaStreamReadWriter<
13 'a,
14 R: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
15 + Unpin
16 + Send
17 + Sync,
18> {
19 input_stream: R,
20 transformers: Vec<(TransformerType, Box<dyn Transformer + Send + Sync + 'a>)>,
21 sink: Box<dyn Sink + Send + Sync + 'a>,
22 receiver: Receiver<Message>,
23 sender: Sender<Message>,
24 size_counter: usize,
25 current_file_context: Option<(FileContext, bool)>,
26 file_ctx_rx: Option<Receiver<(FileContext, bool)>>,
27}
28
29impl<
30 'a,
31 R: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
32 + Unpin
33 + Send
34 + Sync,
35 > ArunaStreamReadWriter<'a, R>
36{
37 #[tracing::instrument(level = "trace", skip(input_stream, transformer))]
38 pub fn new_with_sink<T: Transformer + Sink + Send + Sync + 'a>(
39 input_stream: R,
40 transformer: T,
41 ) -> Self {
42 let (sx, rx) = async_channel::unbounded();
43 ArunaStreamReadWriter {
44 input_stream,
45 sink: Box::new(transformer),
46 transformers: Vec::new(),
47 sender: sx,
48 receiver: rx,
49 size_counter: 0,
50 current_file_context: None,
51 file_ctx_rx: None,
52 }
53 }
54
55 #[tracing::instrument(level = "trace", skip(input_stream, writer))]
56 pub fn new_with_writer<W: AsyncWrite + Send + Sync + 'a>(input_stream: R, writer: W) -> Self {
57 let (sx, rx) = async_channel::unbounded();
58 ArunaStreamReadWriter {
59 input_stream,
60 sink: Box::new(WriterSink::new(BufWriter::new(Box::pin(writer)))),
61 transformers: Vec::new(),
62 sender: sx,
63 receiver: rx,
64 size_counter: 0,
65 current_file_context: None,
66 file_ctx_rx: None,
67 }
68 }
69
70 #[tracing::instrument(level = "trace", skip(self, transformer))]
71 pub fn add_transformer<T: Transformer + Send + Sync + 'a>(
72 mut self,
73 mut transformer: T,
74 ) -> ArunaStreamReadWriter<'a, R> {
75 transformer.add_sender(self.sender.clone());
76 self.transformers
77 .push((transformer.get_type(), Box::new(transformer)));
78 self
79 }
80}
81
82#[async_trait::async_trait]
83impl<
84 'a,
85 R: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
86 + Unpin
87 + Send
88 + Sync,
89 > ReadWriter for ArunaStreamReadWriter<'a, R>
90{
91 #[tracing::instrument(err, level = "trace", skip(self))]
92 async fn process(&mut self) -> Result<()> {
93 let mut read_buf = BytesMut::with_capacity(65_536 * 2);
95 let mut hold_buffer = BytesMut::with_capacity(65536);
96 let mut finished;
97 let mut maybe_msg: Vec<Message> = vec![];
98 let mut data;
99 let mut read_bytes: usize = 0;
100 let mut next_file = false;
101 let mut context_zero_file = false;
102
103 if let Some(rx) = &self.file_ctx_rx {
104 let (context, is_last) = rx.try_recv()?;
105 if context.is_dir || context.is_symlink {
106 context_zero_file = true;
107 }
108 debug!(?context, ?is_last, "received file context");
109 self.current_file_context = Some((context.clone(), is_last));
110 self.announce_all(Message {
111 target: TransformerType::All,
112 data: crate::notifications::MessageData::NextFile(FileMessage { context, is_last }),
113 })
114 .await?;
115 }
116
117 loop {
118 if !context_zero_file {
119 if hold_buffer.is_empty() {
120 if read_buf.is_empty() {
121 data = self
122 .input_stream
123 .next()
124 .await
125 .unwrap_or_else(|| Ok(Bytes::new()))
126 .unwrap_or_default();
127 read_bytes = data.len();
128 read_buf.put(data);
129 }
130 } else if read_buf.is_empty() {
131 mem::swap(&mut hold_buffer, &mut read_buf);
132 }
133 }
134
135 if let Some((context, is_last)) = &self.current_file_context {
136 self.size_counter += read_bytes;
137 if self.size_counter > context.input_size as usize
138 && !context.is_dir
139 && !context.is_symlink
140 {
141 let mut diff = read_bytes - (self.size_counter - context.input_size as usize);
142 if diff >= context.input_size as usize {
143 diff = context.input_size as usize
144 }
145 hold_buffer = read_buf.split_to(diff);
146 mem::swap(&mut read_buf, &mut hold_buffer);
147 self.size_counter -= context.input_size as usize;
148 next_file = !is_last;
149 }
150 if context.is_dir || context.is_symlink {
151 next_file = !is_last;
152 }
153 finished = read_buf.is_empty() && read_bytes == 0 && *is_last;
154 } else {
155 finished = read_buf.is_empty() && read_bytes == 0;
156 }
157
158 for (ttype, trans) in self.transformers.iter_mut() {
159 if !maybe_msg.is_empty() {
160 for msg in &maybe_msg {
161 if msg.target == *ttype {
162 trans.notify(msg).await?;
163 }
164 }
165 }
166 if let Ok(msg) = self.receiver.try_recv() {
167 maybe_msg.push(msg);
168 }
169 match trans.process_bytes(&mut read_buf, finished, false).await? {
170 true => {}
171 false => finished = false,
172 };
173 }
174 match self
175 .sink
176 .process_bytes(&mut read_buf, finished, false)
177 .await?
178 {
179 true => {}
180 false => finished = false,
181 };
182
183 if next_file {
185 if let Some(rx) = &self.file_ctx_rx {
186 assert!(read_buf.is_empty());
188 for (_, trans) in self.transformers.iter_mut() {
189 trans.process_bytes(&mut read_buf, finished, true).await?;
190 }
191 self.sink
192 .process_bytes(&mut read_buf, finished, true)
193 .await?;
194 let (context, is_last) = rx.recv().await?;
195
196 maybe_msg.clear();
198
199 context_zero_file = context.is_dir || context.is_symlink;
201 self.current_file_context = Some((context.clone(), is_last));
202 self.announce_all(Message {
203 target: TransformerType::All,
204 data: crate::notifications::MessageData::NextFile(FileMessage {
205 context,
206 is_last,
207 }),
208 })
209 .await?;
210 for (_, trans) in self.transformers.iter_mut() {
211 trans.process_bytes(&mut read_buf, finished, false).await?;
212 }
213 self.sink
214 .process_bytes(&mut read_buf, finished, true)
215 .await?;
216 }
217 if !context_zero_file {
218 next_file = false;
219 }
220 }
221
222 if read_buf.is_empty() & finished {
223 break;
224 }
225 read_bytes = 0;
226 }
227 Ok(())
228 }
229 #[tracing::instrument(level = "trace", skip(self, message))]
230 async fn announce_all(&mut self, message: Message) -> Result<()> {
231 for (_, trans) in self.transformers.iter_mut() {
232 trans.notify(&message).await?;
233 }
234 Ok(())
235 }
236
237 #[tracing::instrument(level = "trace", skip(self, rx))]
238 async fn add_file_context_receiver(&mut self, rx: Receiver<(FileContext, bool)>) -> Result<()> {
239 if self.file_ctx_rx.is_none() {
240 self.file_ctx_rx = Some(rx);
241 Ok(())
242 } else {
243 error!("Overwriting existing receivers is not allowed!");
244 bail!("[READ_WRITER] Overwriting existing receivers is not allowed!")
245 }
246 }
247}