1use crate::randfile::RandFile;
2
3use std::io::{Read, Result, Seek, Write};
4use std::num::NonZeroI64;
5
6#[repr(packed)]
7#[derive(Default, Clone, Copy)]
8pub(crate) struct FrameHeader {
9 pub(crate) linked_frame: Option<NonZeroI64>,
10 pub(crate) linked_frame_size: u64,
11}
12
13impl FrameHeader {
14 fn new(relative_offset: i64, frame_size: u64) -> Self {
15 Self {
16 linked_frame: NonZeroI64::new(relative_offset.to_le()),
17 linked_frame_size: frame_size.to_le(),
18 }
19 }
20 fn from_bytes(data: &[u8]) -> FrameHeader {
21 assert!(data.len() >= std::mem::size_of::<Self>());
22 let data = *unsafe { std::mem::transmute::<_, &FrameHeader>(data.as_ptr()) };
23 let offset = data.linked_frame.map_or(0, |x| x.get().to_le());
24 let size = data.linked_frame_size;
25 Self::new(offset, size)
26 }
27
28 fn as_bytes(&self) -> &[u8] {
29 unsafe {
30 std::slice::from_raw_parts(
31 self as *const Self as *const u8,
32 std::mem::size_of::<Self>(),
33 )
34 }
35 }
36}
37
38#[derive(Default)]
42struct Frame {
43 header: FrameHeader,
44 offset: Option<u64>,
47 parent_frame: Option<(u64, usize)>,
49 current_frame_size: usize,
51 dirty: bool,
53 payload_offset: usize,
55 data: Vec<u8>,
57}
58
59impl Frame {
60 fn update_frame_link<W: Seek + Write>(
61 &mut self,
62 file: &mut RandFile<W>,
63 offset: u64,
64 size: usize,
65 ) -> Result<()> {
66 if let Some((parent_frame, _parent_size)) = self.parent_frame {
67 let new_header = FrameHeader::new(offset as i64 - parent_frame as i64, size as u64);
68 file.update_block(parent_frame, new_header.as_bytes())
69 } else {
70 Ok(())
71 }
72 }
73
74 fn sync_current_frame<W: Seek + Write>(&mut self, file: &mut RandFile<W>) -> Result<()> {
75 if !self.dirty {
76 return Ok(());
77 }
78
79 if let Some(offset) = self.offset {
80 file.update_block(offset, &self.data)?;
81 } else {
82 let offset = file.append_block(&self.data)?;
83 self.offset = Some(offset);
84 self.update_frame_link(file, offset, self.current_frame_size)?;
85 };
86 Ok(())
87 }
88
89 fn reserve_frame<W: Write + Seek>(
90 &mut self,
91 file: &mut RandFile<W>,
92 size: usize,
93 ) -> Result<()> {
94 if self.offset.is_some() {
95 return Err(std::io::Error::new(
96 std::io::ErrorKind::Other,
97 "Invalid reservation",
98 ));
99 }
100
101 let offset = file.reserve_block(size)?;
102 self.offset = Some(offset);
103 self.current_frame_size = size;
104 self.update_frame_link(file, offset, size)?;
105 Ok(())
106 }
107
108 fn zero_frame(&mut self) {
109 self.data[self.payload_offset..]
110 .iter_mut()
111 .for_each(|m| *m = 0);
112 self.data.resize(self.current_frame_size, 0);
113 }
114
115 fn alloc_new_frame<W: Seek + Write>(
116 this: Option<Self>,
117 file: &mut RandFile<W>,
118 reserve: usize,
119 ) -> Result<Self> {
120 let (mut ret, parent) = if let Some(mut current) = this {
121 current.sync_current_frame(file)?;
122 let parent = current.offset.map(|ofs| (ofs, current.current_frame_size));
123 (current, parent)
124 } else {
125 (Self::default(), None)
126 };
127 ret.header.linked_frame = None;
128 ret.header.linked_frame_size = 0;
129 ret.parent_frame = parent;
130 ret.offset = None;
131 ret.current_frame_size = std::mem::size_of::<FrameHeader>();
132 ret.dirty = true;
133 ret.payload_offset = std::mem::size_of::<FrameHeader>();
134 ret.data.resize(std::mem::size_of::<FrameHeader>(), 0);
135 if reserve > 0 {
136 ret.reserve_frame(file, reserve)?;
137 }
138 Ok(ret)
139 }
140
141 fn load_from_file<R: Seek + Read>(
142 file: &mut RandFile<R>,
143 offset: u64,
144 size: usize,
145 read_payload: bool,
146 buf: Option<Self>,
147 backward: bool,
148 ) -> Result<Self> {
149 let bytes_to_read = if !read_payload {
150 std::mem::size_of::<FrameHeader>()
151 } else {
152 size
153 };
154
155 let mut ret = if let Some(buf) = buf {
156 buf
157 } else {
158 Self::default()
159 };
160
161 ret.data.resize(bytes_to_read, 0);
162 if ret.data.len() != file.read_block(offset, &mut ret.data[..])? {
163 return Err(std::io::Error::new(
164 std::io::ErrorKind::Other,
165 "Invalid frame size",
166 ));
167 }
168
169 ret.header = FrameHeader::from_bytes(&ret.data);
170
171 ret.dirty = false;
172 ret.payload_offset = std::mem::size_of::<FrameHeader>();
173 ret.current_frame_size = size;
174 if !backward {
175 ret.parent_frame = ret.offset.map(|offset| (offset, ret.current_frame_size));
176 } else {
177 ret.parent_frame = None;
178 }
179 ret.offset = Some(offset);
180
181 Ok(ret)
182 }
183
184 fn load_next_frame<R: Seek + Read>(
185 self,
186 file: &mut RandFile<R>,
187 read_payload: bool,
188 ) -> Result<Option<Self>> {
189 if let Some(offset) = self.offset {
190 if let Some(rel_addr) = self.header.linked_frame.map(i64::from) {
191 let size = self.header.linked_frame_size as usize;
192 let addr = (offset as i64 + rel_addr) as u64;
193 return Self::load_from_file(file, addr, size, read_payload, Some(self), false)
194 .map(Some);
195 }
196 }
197 Ok(None)
198 }
199
200 fn load_previous_frame<R: Seek + Read>(
201 self,
202 file: &mut RandFile<R>,
203 read_payload: bool,
204 ) -> Result<Option<Self>> {
205 if let Some((parent_ofs, parent_size)) = self.parent_frame {
206 return Self::load_from_file(
207 file,
208 parent_ofs,
209 parent_size,
210 read_payload,
211 Some(self),
212 true,
213 )
214 .map(Some);
215 }
216 Ok(None)
217 }
218}
219
220pub struct Stream<T> {
221 file: RandFile<T>,
222 current_frame: Option<Frame>,
223 cursor: usize,
224 frame_size: usize,
225 pre_alloc: bool,
226 on_drop: Box<dyn FnOnce(&mut Self) + Send + Sync>,
227}
228impl<T> Stream<T> {
229 pub fn set_frame_size(&mut self, size: usize) {
230 self.frame_size = size;
231 }
232 pub fn double_frame_size(&mut self, limit: usize) {
233 if self.frame_size * 2 > limit {
234 self.frame_size = limit;
235 return;
236 }
237 self.frame_size *= 2;
238 }
239 pub(crate) fn clone_underlying_file<'b>(&'b self) -> RandFile<T> {
240 self.file.clone()
241 }
242
243 pub(crate) fn get_frame_offset(&self) -> Option<u64> {
244 self.current_frame.as_ref().and_then(|frame| frame.offset)
245 }
246
247 pub(crate) fn get_frame_size(&self) -> Option<usize> {
248 self.current_frame
249 .as_ref()
250 .map(|frame| frame.current_frame_size)
251 }
252
253 pub fn get_frame_capacity(&self) -> usize {
254 self.frame_size - std::mem::size_of::<FrameHeader>()
255 }
256}
257impl<T: Read + Write + Seek> Stream<T> {
258 pub fn flush(&mut self) -> Result<()> {
259 let current_frame = std::mem::replace(&mut self.current_frame, None);
260 self.current_frame = Some(Frame::alloc_new_frame(current_frame, &mut self.file, 0)?);
261 self.cursor = 0;
262 Ok(())
263 }
264
265 pub fn write(&mut self, buffer: &[u8]) -> Result<usize> {
266 self.write_with_alloc_callback(buffer, |_| ())
267 }
268
269 pub fn disable_pre_alloc(&mut self) {
270 self.pre_alloc = false;
271 }
272
273 pub fn write_frame(&mut self, buffer: &[u8]) -> Result<()> {
274 self.flush()?;
275 if let Some(frame) = self.current_frame.as_mut() {
276 frame.data.extend_from_slice(buffer);
277 frame.current_frame_size = frame.data.len();
278 }
279 Ok(())
280 }
281
282 pub fn write_with_alloc_callback<R: FnMut(&mut Self)>(
285 &mut self,
286 buffer: &[u8],
287 mut callback: R,
288 ) -> Result<usize> {
289 let mut ret = 0;
290 let mut ptr = buffer;
291 while !ptr.is_empty() {
292 let bytes_can_write = if self
294 .current_frame
295 .as_ref()
296 .map_or(false, |s| s.offset.is_some())
297 {
298 self.current_frame
301 .as_ref()
302 .map_or(0, |f| f.current_frame_size - f.payload_offset - self.cursor)
303 .min(ptr.len())
304 } else {
305 if self.frame_size > 0 {
307 self.current_frame
308 .as_ref()
309 .map_or(0, |f| self.frame_size - f.payload_offset - self.cursor)
310 .min(ptr.len())
311 } else {
312 ptr.len()
313 }
314 };
315
316 if bytes_can_write == 0 {
317 if let Some(Some(_)) = self.current_frame.as_ref().map(|f| f.header.linked_frame) {
318 let mut current_frame = self.current_frame.take().unwrap();
319 current_frame.sync_current_frame(&mut self.file)?;
320 self.current_frame = current_frame.load_next_frame(&mut self.file, true)?;
321 } else {
322 callback(self);
323 let current_frame = self.current_frame.take();
324 self.current_frame =
325 Some(Frame::alloc_new_frame(current_frame, &mut self.file, 0)?);
326 if self.frame_size > 0 && self.pre_alloc {
327 let frame = self.current_frame.as_mut().unwrap();
328 frame.reserve_frame(&mut self.file, self.frame_size)?;
329 frame.zero_frame();
330 }
331 }
332 self.cursor = 0;
333 continue;
334 }
335
336 let cursor = self.cursor;
337 if let Some(ref mut frame) = self.current_frame {
338 let start = frame.payload_offset + cursor;
339 let end = start + bytes_can_write;
340 if frame.data.len() < end {
341 frame.data.resize(end, 0);
342 }
343 frame.data[start..end].copy_from_slice(&ptr[..bytes_can_write]);
344 frame.current_frame_size = frame.current_frame_size.max(end);
345 frame.dirty = true;
346 }
347 ptr = &ptr[bytes_can_write..];
348 self.cursor += bytes_can_write;
349 ret += bytes_can_write;
350 }
351 Ok(ret)
352 }
353}
354impl<T: Read + Seek> Read for Stream<T> {
355 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
356 Stream::read(self, buf)
357 }
358}
359impl<T: Read + Write + Seek> Write for Stream<T> {
360 fn write(&mut self, buf: &[u8]) -> Result<usize> {
361 Stream::write(self, buf)
362 }
363 fn flush(&mut self) -> Result<()> {
364 self.flush()
365 }
366}
367impl<T: Read + Seek> AsRef<[u8]> for Stream<T> {
368 fn as_ref(&self) -> &[u8] {
369 if let Some(r) = self.read_current_frame() {
370 r
371 } else {
372 &[]
373 }
374 }
375}
376impl<T: Read + Seek> Stream<T> {
377 pub fn load_next_frame(&mut self) -> Result<()> {
378 if let Some(this_frame) = self.current_frame.take() {
379 self.cursor = 0;
380 self.current_frame = this_frame.load_next_frame(&mut self.file, true)?;
381 }
382 Ok(())
383 }
384 pub fn read_current_frame(&self) -> Option<&[u8]> {
385 if let Some(this_frame) = self.current_frame.as_ref() {
386 return Some(&this_frame.data[this_frame.payload_offset..]);
387 }
388 None
389 }
390 pub fn copy_current_frame_data(&self, buf: &mut Vec<u8>) {
391 buf.clear();
392 buf.extend_from_slice(self.read_current_frame().unwrap_or(&[]));
393 }
394 pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
395 let mut ret = 0;
396 let mut ptr = buffer;
397 while self.current_frame.is_some() && !ptr.is_empty() {
398 let bytes_read = {
399 let can_read = self
400 .current_frame
401 .as_ref()
402 .map_or(0, |f| f.data.len() - f.payload_offset)
403 .max(self.cursor)
404 - self.cursor;
405 if can_read == 0 {
406 let this_frame = std::mem::replace(&mut self.current_frame, None);
407 self.current_frame =
408 this_frame.unwrap().load_next_frame(&mut self.file, true)?;
409 self.cursor = 0;
410 continue;
411 }
412 can_read
413 }
414 .min(ptr.len());
415 ptr[..bytes_read].copy_from_slice(
416 self.current_frame
417 .as_ref()
418 .map(|f| {
419 let start = f.payload_offset + self.cursor;
420 let end = start + bytes_read;
421 &f.data[start..end]
422 })
423 .unwrap(),
424 );
425 ret += bytes_read;
426 ptr = &mut ptr[bytes_read..];
427 self.cursor += bytes_read;
428 }
429 Ok(ret)
430 }
431 pub(crate) fn open_for_read(file: RandFile<T>, primary_frame: (u64, usize)) -> Result<Self> {
432 Self::open_with_ondrop(file, primary_frame, |_| {})
433 }
434 pub(crate) fn open_for_update(file: RandFile<T>, primary_frame: (u64, usize)) -> Result<Self>
435 where
436 T: Write,
437 {
438 Self::open_with_ondrop(file, primary_frame, |s| s.flush().unwrap())
439 }
440 pub(crate) fn open_with_ondrop<D: FnOnce(&mut Self) + Send + Sync + 'static>(
441 mut file: RandFile<T>,
442 primary_frame: (u64, usize),
443 on_drop: D,
444 ) -> Result<Self> {
445 let primary_frame = Frame::load_from_file(
446 &mut file,
447 primary_frame.0,
448 primary_frame.1,
449 true,
450 None,
451 false,
452 )?;
453 let frame_size = primary_frame.current_frame_size;
454 let current_frame = Some(primary_frame);
455 Ok(Self {
456 file,
457 current_frame,
458 cursor: 0,
459 frame_size,
460 on_drop: Box::new(on_drop),
461 pre_alloc: true,
462 })
463 }
464}
465
466impl<'a, T: Read + Write + Seek> Stream<T> {
467 pub fn update_current_byte(&mut self, byte: u8) -> Result<usize> {
468 if let Some(current_frame) = &self.current_frame {
469 if self.cursor > 0 {
470 self.cursor -= 1;
471 } else {
472 if current_frame.parent_frame.is_some() {
473 let this_frame = self.current_frame.take().unwrap();
474 this_frame.load_previous_frame(&mut self.file, true)?;
475 }
476 }
478 self.write(&[byte])?;
479 return Ok(1);
480 }
481 Ok(0)
482 }
483 pub(crate) fn create(mut file: RandFile<T>, frame_size: usize) -> Result<Self> {
484 let current_frame = Some(Frame::alloc_new_frame(None, &mut file, frame_size)?);
485 Ok(Self {
486 file,
487 current_frame,
488 cursor: 0,
489 frame_size,
490 on_drop: Box::new(|this| {
491 this.flush().unwrap();
492 }),
493 pre_alloc: true,
494 })
495 }
496}
497
498impl<T> Drop for Stream<T> {
499 fn drop(&mut self) {
500 let drop_callback = std::mem::replace(&mut self.on_drop, Box::new(|_| {}));
501 drop_callback(self);
502 }
503}
504
505#[cfg(test)]
506mod test {
507 use super::Stream;
508 use crate::randfile::RandFile;
509 use std::io::Cursor;
510 type TestResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
511 #[test]
512 fn test_stream_send() {
513 fn check_send<T: Send>() {}
514 check_send::<Stream<std::fs::File>>();
515 }
516 #[test]
517 fn test_compose_stream() -> TestResult<()> {
518 let mut buffer = vec![];
519 {
520 let fp = Cursor::new(&mut buffer);
521 let file = RandFile::new(fp);
522
523 let mut stream = Stream::create(file.clone(), 0)?;
524 let mut stream2 = Stream::create(file, 0)?;
525
526 stream.write(b"This is a test frame")?;
527 stream2.write(b"This is another stream")?;
528
529 stream.flush()?;
530
531 stream.write(b"This is the second block")?;
532 stream2.write(b"This is another stream - 2")?;
533 stream2.flush()?;
534 stream.flush()?;
535 }
536
537 {
538 let fp = Cursor::new(&mut buffer);
539 let file = RandFile::new(fp);
540 let mut stream = Stream::open_for_read(file, (0, 30))?;
541 let mut data = [0; 100];
542 assert_eq!(38, stream.read(&mut data)?);
543 }
544
545 Ok(())
546 }
547 #[test]
548 fn test_traverse_file() -> TestResult<()> {
549 let test_blob: Vec<_> = vec![
550 19, 0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0xdd, 0xdd, 0xdd, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
554 ];
555 let reader = Cursor::new(test_blob);
556 let file = RandFile::new(reader);
557
558 let mut stream = Stream::open_for_read(file, (0, 19))?;
559
560 let mut buffer = vec![0; 100];
561
562 assert_eq!(7, stream.read(&mut buffer)?);
563
564 Ok(())
565 }
566
567 #[test]
568 fn test_modify_stream() -> TestResult<()> {
569 let test_blob: Vec<_> = vec![
570 19, 0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0xdd, 0xdd, 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
576 ];
577 let reader = Cursor::new(test_blob);
578 let file = RandFile::new(reader);
579
580 let mut stream = Stream::open_for_read(file, (0, 19))?;
581
582 loop {
583 let mut buf = [0; 1];
584 stream.read(&mut buf[..]).unwrap();
585 if buf[0] == 0 {
586 break;
587 }
588 }
589 assert_eq!(stream.update_current_byte(0xdd).unwrap(), 1);
590 stream.write(&[0xdd]).unwrap();
591 stream.write(&[0xdd]).unwrap();
592 stream.flush().unwrap();
593
594 let test_blob = stream
595 .clone_underlying_file()
596 .clone_inner()
597 .unwrap()
598 .into_inner();
599
600 assert_eq!(test_blob[18], 0xdd);
601 assert_eq!(test_blob[35], 0xdd);
602 assert_eq!(test_blob[36], 0xdd);
603
604 Ok(())
605 }
606}