surrealml_core/storage/
stream_adapter.rs1use std::fs::File;
3use std::io::Read;
4use bytes::Bytes;
5
6use futures_core::stream::Stream;
7use futures_core::task::{Context, Poll};
8use std::pin::Pin;
9use std::error::Error;
10use crate::{
11 safe_eject,
12 errors::error::{
13 SurrealError,
14 SurrealErrorStatus
15 }
16};
17
18
19pub struct StreamAdapter {
25 chunk_size: usize,
26 file_pointer: File
27}
28
29impl StreamAdapter {
30
31 pub fn new(chunk_size: usize, file_path: String) -> Result<Self, SurrealError> {
40 let file_pointer = safe_eject!(File::open(file_path), SurrealErrorStatus::NotFound);
41 Ok(StreamAdapter {
42 chunk_size,
43 file_pointer
44 })
45 }
46
47}
48
49impl Stream for StreamAdapter {
50
51 type Item = Result<Bytes, Box<dyn Error + Send + Sync>>;
52
53 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 let mut buffer = vec![0u8; self.chunk_size];
63 let bytes_read = self.file_pointer.read(&mut buffer)?;
64
65 buffer.truncate(bytes_read);
66 if buffer.is_empty() {
67 return Poll::Ready(None);
68 }
69 Poll::Ready(Some(Ok(buffer.into())))
70 }
71}