sos_filesystem/formats/
stream.rs

1//! File streams.
2use crate::{formats::FileItem, Result};
3use async_trait::async_trait;
4use binary_stream::futures::{stream_length, BinaryReader};
5use sos_core::encoding::encoding_options;
6use sos_vfs::File;
7use std::{io::SeekFrom, ops::Range};
8use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
9
10/// Trait for file format iterators.
11#[async_trait]
12pub trait FormatStreamIterator<T>
13where
14    T: FileItem + Send,
15{
16    /// Next entry in the iterator.
17    async fn next(&mut self) -> Result<Option<T>>;
18}
19
20/// Generic iterator for file formats.
21///
22/// Supports files and in-memory buffers and can iterate lazily
23/// in both directions.
24pub struct FormatStream<T, R>
25where
26    T: FileItem + Send,
27    R: AsyncRead + AsyncSeek + Unpin + Send,
28{
29    /// Offset from the beginning of the stream where
30    /// iteration should start and reverse iteration
31    /// should complete.
32    ///
33    /// This is often the length of the identity magic
34    /// bytes but in some cases may be specified when
35    /// creating the iterator, for example, vault files
36    /// have information in the file header so we need
37    /// to pass the offset where the content starts.
38    header_offset: u64,
39
40    /// After decoding the row record is there a u32
41    /// that is used to indicate the length of a a data
42    /// blob for the row; if so then `value` will point
43    /// to the data. This is used for lazy decoding such
44    /// as in the case of event log files where we need to read
45    /// the commit hash(es) and timestamp most of the time
46    /// but sometimes need to read the row data too.
47    data_length_prefix: bool,
48    /// The read stream.
49    read_stream: R,
50    /// Byte offset for forward iteration.
51    forward: Option<u64>,
52    /// Byte offset for backward iteration.
53    backward: Option<u64>,
54    /// Whether to iterate in reverse.
55    reverse: bool,
56    /// Marker type.
57    marker: std::marker::PhantomData<T>,
58}
59
60impl<T: FileItem + Send> FormatStream<T, File> {
61    /// Create a new file iterator.
62    pub async fn new_file(
63        mut read_stream: File,
64        identity: &'static [u8],
65        data_length_prefix: bool,
66        header_offset: Option<u64>,
67        reverse: bool,
68    ) -> Result<Self> {
69        let header_offset = header_offset.unwrap_or(identity.len() as u64);
70        read_stream.seek(SeekFrom::Start(header_offset)).await?;
71
72        Ok(Self {
73            header_offset,
74            data_length_prefix,
75            read_stream,
76            forward: None,
77            backward: None,
78            reverse,
79            marker: std::marker::PhantomData,
80        })
81    }
82}
83
84#[async_trait]
85impl<T: FileItem + Send> FormatStreamIterator<T> for FormatStream<T, File> {
86    async fn next(&mut self) -> Result<Option<T>> {
87        if self.reverse {
88            self.next_back().await
89        } else {
90            self.next_forward().await
91        }
92    }
93}
94
95/*
96#[async_trait]
97impl<T: FileItem + Send> FormatStreamIterator<T>
98    for FormatStream<T, MemoryBuffer>
99{
100    async fn next(&mut self) -> Result<Option<T>> {
101        if self.reverse {
102            self.next_back().await
103        } else {
104            self.next_forward().await
105        }
106    }
107}
108*/
109
110impl<T, R> FormatStream<T, R>
111where
112    T: FileItem + Send,
113    R: AsyncRead + AsyncSeek + Unpin + Send,
114{
115    /// Set the byte offset that constrains iteration.
116    ///
117    /// Useful when creating streams of log events.
118    pub fn set_offset(&mut self, offset: u64) {
119        self.header_offset = offset;
120    }
121
122    /// Helper to decode the row file record.
123    async fn read_row(
124        reader: &mut BinaryReader<&mut R>,
125        offset: Range<u64>,
126        is_prefix: bool,
127    ) -> Result<T> {
128        let mut row: T = Default::default();
129
130        row.decode(&mut *reader).await?;
131
132        if is_prefix {
133            // The byte range for the row value.
134            let value_len = reader.read_u32().await?;
135
136            let begin = reader.stream_position().await?;
137            let end = begin + value_len as u64;
138            row.set_value(begin..end);
139        } else {
140            row.set_value(offset.start + 4..offset.end - 4);
141        }
142
143        row.set_offset(offset);
144        Ok(row)
145    }
146
147    /// Attempt to read the next log row.
148    async fn read_row_next(&mut self) -> Result<T> {
149        let row_pos = self.forward.unwrap();
150
151        let mut reader =
152            BinaryReader::new(&mut self.read_stream, encoding_options());
153        reader.seek(SeekFrom::Start(row_pos)).await?;
154        let row_len = reader.read_u32().await?;
155
156        // Position of the end of the row
157        let row_end = row_pos + (row_len as u64 + 8);
158
159        let row = FormatStream::read_row(
160            &mut reader,
161            row_pos..row_end,
162            self.data_length_prefix,
163        )
164        .await?;
165
166        // Prepare position for next iteration
167        self.forward = Some(row_end);
168
169        Ok(row)
170    }
171
172    /// Attempt to read the next log row for backward iteration.
173    async fn read_row_next_back(&mut self) -> Result<T> {
174        let row_pos = self.backward.unwrap();
175
176        let mut reader =
177            BinaryReader::new(&mut self.read_stream, encoding_options());
178
179        // Read in the reverse iteration row length
180        reader.seek(SeekFrom::Start(row_pos - 4)).await?;
181        let row_len = reader.read_u32().await?;
182
183        // Position of the beginning of the row
184        // FIXME: handle panic on overflow when file length is too short
185        let row_start = row_pos - (row_len as u64 + 8);
186        let row_end = row_start + (row_len as u64 + 8);
187
188        // Seek to the beginning of the row after the initial
189        // row length so we can read in the row data
190        reader.seek(SeekFrom::Start(row_start + 4)).await?;
191        let row = FormatStream::read_row(
192            &mut reader,
193            row_start..row_end,
194            self.data_length_prefix,
195        )
196        .await?;
197
198        // Prepare position for next iteration.
199        self.backward = Some(row_start);
200
201        Ok(row)
202    }
203
204    async fn next_forward(&mut self) -> Result<Option<T>> {
205        let offset = self.header_offset;
206
207        if let (Some(lpos), Some(rpos)) = (self.forward, self.backward) {
208            if lpos == rpos {
209                return Ok(None);
210            }
211        }
212
213        let len = stream_length(&mut self.read_stream).await?;
214        if len > offset {
215            // Got to EOF
216            if let Some(lpos) = self.forward {
217                if lpos == len {
218                    return Ok(None);
219                }
220            }
221
222            if self.forward.is_none() {
223                self.forward = Some(offset);
224            }
225
226            Ok(Some(self.read_row_next().await?))
227        } else {
228            Ok(None)
229        }
230    }
231
232    async fn next_back(&mut self) -> Result<Option<T>> {
233        let offset: u64 = self.header_offset;
234
235        if let (Some(lpos), Some(rpos)) = (self.forward, self.backward) {
236            if lpos == rpos {
237                return Ok(None);
238            }
239        }
240
241        let len = stream_length(&mut self.read_stream).await?;
242        if len > offset {
243            // Got to EOF
244            if let Some(rpos) = self.backward {
245                if rpos == offset {
246                    return Ok(None);
247                }
248            }
249
250            if self.backward.is_none() {
251                self.backward = Some(len);
252            }
253            Ok(Some(self.read_row_next_back().await?))
254        } else {
255            Ok(None)
256        }
257    }
258}