aruna_file/
streamreadwrite.rs

1use crate::notifications::{FileMessage, Message};
2use crate::transformer::{FileContext, ReadWriter, Sink, Transformer, TransformerType};
3use crate::transformers::writer_sink::WriterSink;
4use anyhow::{bail, Result};
5use async_channel::{Receiver, Sender};
6use bytes::{BufMut, Bytes, BytesMut};
7use futures::{Stream, StreamExt};
8use std::mem;
9use tokio::io::{AsyncWrite, BufWriter};
10use tracing::{debug, error};
11
12pub struct ArunaStreamReadWriter<
13    'a,
14    R: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
15        + Unpin
16        + Send
17        + Sync,
18> {
19    input_stream: R,
20    transformers: Vec<(TransformerType, Box<dyn Transformer + Send + Sync + 'a>)>,
21    sink: Box<dyn Sink + Send + Sync + 'a>,
22    receiver: Receiver<Message>,
23    sender: Sender<Message>,
24    size_counter: usize,
25    current_file_context: Option<(FileContext, bool)>,
26    file_ctx_rx: Option<Receiver<(FileContext, bool)>>,
27}
28
29impl<
30        'a,
31        R: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
32            + Unpin
33            + Send
34            + Sync,
35    > ArunaStreamReadWriter<'a, R>
36{
37    #[tracing::instrument(level = "trace", skip(input_stream, transformer))]
38    pub fn new_with_sink<T: Transformer + Sink + Send + Sync + 'a>(
39        input_stream: R,
40        transformer: T,
41    ) -> Self {
42        let (sx, rx) = async_channel::unbounded();
43        ArunaStreamReadWriter {
44            input_stream,
45            sink: Box::new(transformer),
46            transformers: Vec::new(),
47            sender: sx,
48            receiver: rx,
49            size_counter: 0,
50            current_file_context: None,
51            file_ctx_rx: None,
52        }
53    }
54
55    #[tracing::instrument(level = "trace", skip(input_stream, writer))]
56    pub fn new_with_writer<W: AsyncWrite + Send + Sync + 'a>(input_stream: R, writer: W) -> Self {
57        let (sx, rx) = async_channel::unbounded();
58        ArunaStreamReadWriter {
59            input_stream,
60            sink: Box::new(WriterSink::new(BufWriter::new(Box::pin(writer)))),
61            transformers: Vec::new(),
62            sender: sx,
63            receiver: rx,
64            size_counter: 0,
65            current_file_context: None,
66            file_ctx_rx: None,
67        }
68    }
69
70    #[tracing::instrument(level = "trace", skip(self, transformer))]
71    pub fn add_transformer<T: Transformer + Send + Sync + 'a>(
72        mut self,
73        mut transformer: T,
74    ) -> ArunaStreamReadWriter<'a, R> {
75        transformer.add_sender(self.sender.clone());
76        self.transformers
77            .push((transformer.get_type(), Box::new(transformer)));
78        self
79    }
80}
81
82#[async_trait::async_trait]
83impl<
84        'a,
85        R: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
86            + Unpin
87            + Send
88            + Sync,
89    > ReadWriter for ArunaStreamReadWriter<'a, R>
90{
91    #[tracing::instrument(err, level = "trace", skip(self))]
92    async fn process(&mut self) -> Result<()> {
93        // The buffer that accumulates the "actual" data
94        let mut read_buf = BytesMut::with_capacity(65_536 * 2);
95        let mut hold_buffer = BytesMut::with_capacity(65536);
96        let mut finished;
97        let mut maybe_msg: Vec<Message> = vec![];
98        let mut data;
99        let mut read_bytes: usize = 0;
100        let mut next_file = false;
101        let mut context_zero_file = false;
102
103        if let Some(rx) = &self.file_ctx_rx {
104            let (context, is_last) = rx.try_recv()?;
105            if context.is_dir || context.is_symlink {
106                context_zero_file = true;
107            }
108            debug!(?context, ?is_last, "received file context");
109            self.current_file_context = Some((context.clone(), is_last));
110            self.announce_all(Message {
111                target: TransformerType::All,
112                data: crate::notifications::MessageData::NextFile(FileMessage { context, is_last }),
113            })
114            .await?;
115        }
116
117        loop {
118            if !context_zero_file {
119                if hold_buffer.is_empty() {
120                    if read_buf.is_empty() {
121                        data = self
122                            .input_stream
123                            .next()
124                            .await
125                            .unwrap_or_else(|| Ok(Bytes::new()))
126                            .unwrap_or_default();
127                        read_bytes = data.len();
128                        read_buf.put(data);
129                    }
130                } else if read_buf.is_empty() {
131                    mem::swap(&mut hold_buffer, &mut read_buf);
132                }
133            }
134
135            if let Some((context, is_last)) = &self.current_file_context {
136                self.size_counter += read_bytes;
137                if self.size_counter > context.input_size as usize
138                    && !context.is_dir
139                    && !context.is_symlink
140                {
141                    let mut diff = read_bytes - (self.size_counter - context.input_size as usize);
142                    if diff >= context.input_size as usize {
143                        diff = context.input_size as usize
144                    }
145                    hold_buffer = read_buf.split_to(diff);
146                    mem::swap(&mut read_buf, &mut hold_buffer);
147                    self.size_counter -= context.input_size as usize;
148                    next_file = !is_last;
149                }
150                if context.is_dir || context.is_symlink {
151                    next_file = !is_last;
152                }
153                finished = read_buf.is_empty() && read_bytes == 0 && *is_last;
154            } else {
155                finished = read_buf.is_empty() && read_bytes == 0;
156            }
157
158            for (ttype, trans) in self.transformers.iter_mut() {
159                if !maybe_msg.is_empty() {
160                    for msg in &maybe_msg {
161                        if msg.target == *ttype {
162                            trans.notify(msg).await?;
163                        }
164                    }
165                }
166                if let Ok(msg) = self.receiver.try_recv() {
167                    maybe_msg.push(msg);
168                }
169                match trans.process_bytes(&mut read_buf, finished, false).await? {
170                    true => {}
171                    false => finished = false,
172                };
173            }
174            match self
175                .sink
176                .process_bytes(&mut read_buf, finished, false)
177                .await?
178            {
179                true => {}
180                false => finished = false,
181            };
182
183            // Anounce next file
184            if next_file {
185                if let Some(rx) = &self.file_ctx_rx {
186                    // Perform a flush through all transformers!
187                    assert!(read_buf.is_empty());
188                    for (_, trans) in self.transformers.iter_mut() {
189                        trans.process_bytes(&mut read_buf, finished, true).await?;
190                    }
191                    self.sink
192                        .process_bytes(&mut read_buf, finished, true)
193                        .await?;
194                    let (context, is_last) = rx.recv().await?;
195
196                    // Empty message queue
197                    maybe_msg.clear();
198
199                    // Fetch next file context
200                    context_zero_file = context.is_dir || context.is_symlink;
201                    self.current_file_context = Some((context.clone(), is_last));
202                    self.announce_all(Message {
203                        target: TransformerType::All,
204                        data: crate::notifications::MessageData::NextFile(FileMessage {
205                            context,
206                            is_last,
207                        }),
208                    })
209                    .await?;
210                    for (_, trans) in self.transformers.iter_mut() {
211                        trans.process_bytes(&mut read_buf, finished, false).await?;
212                    }
213                    self.sink
214                        .process_bytes(&mut read_buf, finished, true)
215                        .await?;
216                }
217                if !context_zero_file {
218                    next_file = false;
219                }
220            }
221
222            if read_buf.is_empty() & finished {
223                break;
224            }
225            read_bytes = 0;
226        }
227        Ok(())
228    }
229    #[tracing::instrument(level = "trace", skip(self, message))]
230    async fn announce_all(&mut self, message: Message) -> Result<()> {
231        for (_, trans) in self.transformers.iter_mut() {
232            trans.notify(&message).await?;
233        }
234        Ok(())
235    }
236
237    #[tracing::instrument(level = "trace", skip(self, rx))]
238    async fn add_file_context_receiver(&mut self, rx: Receiver<(FileContext, bool)>) -> Result<()> {
239        if self.file_ctx_rx.is_none() {
240            self.file_ctx_rx = Some(rx);
241            Ok(())
242        } else {
243            error!("Overwriting existing receivers is not allowed!");
244            bail!("[READ_WRITER] Overwriting existing receivers is not allowed!")
245        }
246    }
247}