sos_filesystem/formats/
stream.rs1use crate::{formats::FileItem, Result};
3use async_trait::async_trait;
4use binary_stream::futures::{stream_length, BinaryReader};
5use sos_core::encoding::encoding_options;
6use sos_vfs::File;
7use std::{io::SeekFrom, ops::Range};
8use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
9
10#[async_trait]
12pub trait FormatStreamIterator<T>
13where
14 T: FileItem + Send,
15{
16 async fn next(&mut self) -> Result<Option<T>>;
18}
19
20pub struct FormatStream<T, R>
25where
26 T: FileItem + Send,
27 R: AsyncRead + AsyncSeek + Unpin + Send,
28{
29 header_offset: u64,
39
40 data_length_prefix: bool,
48 read_stream: R,
50 forward: Option<u64>,
52 backward: Option<u64>,
54 reverse: bool,
56 marker: std::marker::PhantomData<T>,
58}
59
60impl<T: FileItem + Send> FormatStream<T, File> {
61 pub async fn new_file(
63 mut read_stream: File,
64 identity: &'static [u8],
65 data_length_prefix: bool,
66 header_offset: Option<u64>,
67 reverse: bool,
68 ) -> Result<Self> {
69 let header_offset = header_offset.unwrap_or(identity.len() as u64);
70 read_stream.seek(SeekFrom::Start(header_offset)).await?;
71
72 Ok(Self {
73 header_offset,
74 data_length_prefix,
75 read_stream,
76 forward: None,
77 backward: None,
78 reverse,
79 marker: std::marker::PhantomData,
80 })
81 }
82}
83
84#[async_trait]
85impl<T: FileItem + Send> FormatStreamIterator<T> for FormatStream<T, File> {
86 async fn next(&mut self) -> Result<Option<T>> {
87 if self.reverse {
88 self.next_back().await
89 } else {
90 self.next_forward().await
91 }
92 }
93}
94
95impl<T, R> FormatStream<T, R>
111where
112 T: FileItem + Send,
113 R: AsyncRead + AsyncSeek + Unpin + Send,
114{
115 pub fn set_offset(&mut self, offset: u64) {
119 self.header_offset = offset;
120 }
121
122 async fn read_row(
124 reader: &mut BinaryReader<&mut R>,
125 offset: Range<u64>,
126 is_prefix: bool,
127 ) -> Result<T> {
128 let mut row: T = Default::default();
129
130 row.decode(&mut *reader).await?;
131
132 if is_prefix {
133 let value_len = reader.read_u32().await?;
135
136 let begin = reader.stream_position().await?;
137 let end = begin + value_len as u64;
138 row.set_value(begin..end);
139 } else {
140 row.set_value(offset.start + 4..offset.end - 4);
141 }
142
143 row.set_offset(offset);
144 Ok(row)
145 }
146
147 async fn read_row_next(&mut self) -> Result<T> {
149 let row_pos = self.forward.unwrap();
150
151 let mut reader =
152 BinaryReader::new(&mut self.read_stream, encoding_options());
153 reader.seek(SeekFrom::Start(row_pos)).await?;
154 let row_len = reader.read_u32().await?;
155
156 let row_end = row_pos + (row_len as u64 + 8);
158
159 let row = FormatStream::read_row(
160 &mut reader,
161 row_pos..row_end,
162 self.data_length_prefix,
163 )
164 .await?;
165
166 self.forward = Some(row_end);
168
169 Ok(row)
170 }
171
172 async fn read_row_next_back(&mut self) -> Result<T> {
174 let row_pos = self.backward.unwrap();
175
176 let mut reader =
177 BinaryReader::new(&mut self.read_stream, encoding_options());
178
179 reader.seek(SeekFrom::Start(row_pos - 4)).await?;
181 let row_len = reader.read_u32().await?;
182
183 let row_start = row_pos - (row_len as u64 + 8);
186 let row_end = row_start + (row_len as u64 + 8);
187
188 reader.seek(SeekFrom::Start(row_start + 4)).await?;
191 let row = FormatStream::read_row(
192 &mut reader,
193 row_start..row_end,
194 self.data_length_prefix,
195 )
196 .await?;
197
198 self.backward = Some(row_start);
200
201 Ok(row)
202 }
203
204 async fn next_forward(&mut self) -> Result<Option<T>> {
205 let offset = self.header_offset;
206
207 if let (Some(lpos), Some(rpos)) = (self.forward, self.backward) {
208 if lpos == rpos {
209 return Ok(None);
210 }
211 }
212
213 let len = stream_length(&mut self.read_stream).await?;
214 if len > offset {
215 if let Some(lpos) = self.forward {
217 if lpos == len {
218 return Ok(None);
219 }
220 }
221
222 if self.forward.is_none() {
223 self.forward = Some(offset);
224 }
225
226 Ok(Some(self.read_row_next().await?))
227 } else {
228 Ok(None)
229 }
230 }
231
232 async fn next_back(&mut self) -> Result<Option<T>> {
233 let offset: u64 = self.header_offset;
234
235 if let (Some(lpos), Some(rpos)) = (self.forward, self.backward) {
236 if lpos == rpos {
237 return Ok(None);
238 }
239 }
240
241 let len = stream_length(&mut self.read_stream).await?;
242 if len > offset {
243 if let Some(rpos) = self.backward {
245 if rpos == offset {
246 return Ok(None);
247 }
248 }
249
250 if self.backward.is_none() {
251 self.backward = Some(len);
252 }
253 Ok(Some(self.read_row_next_back().await?))
254 } else {
255 Ok(None)
256 }
257 }
258}