surrealml_core/storage/
stream_adapter.rs

1//! Stream adapter for file system
2use std::fs::File;
3use std::io::Read;
4use bytes::Bytes;
5
6use futures_core::stream::Stream;
7use futures_core::task::{Context, Poll};
8use std::pin::Pin;
9use std::error::Error;
10use crate::{
11    safe_eject,
12    errors::error::{
13        SurrealError,
14        SurrealErrorStatus
15    }
16};
17
18
19/// Stream adapter for file system.
20/// 
21/// # Arguments
22/// * `chunk_size` - The size of the chunks to read from the file.
23/// * `file_pointer` - The pointer to the file to be streamed
24pub struct StreamAdapter {
25    chunk_size: usize,
26    file_pointer: File
27}
28
29impl StreamAdapter {
30
31    /// Creates a new `StreamAdapter` struct.
32    /// 
33    /// # Arguments
34    /// * `chunk_size` - The size of the chunks to read from the file.
35    /// * `file_path` - The path to the file to be streamed
36    /// 
37    /// # Returns
38    /// A new `StreamAdapter` struct.
39    pub fn new(chunk_size: usize, file_path: String) -> Result<Self, SurrealError> {
40        let file_pointer = safe_eject!(File::open(file_path), SurrealErrorStatus::NotFound);
41        Ok(StreamAdapter {
42            chunk_size,
43            file_pointer
44        })
45    }
46
47}
48
49impl Stream for StreamAdapter {
50
51    type Item = Result<Bytes, Box<dyn Error + Send + Sync>>;
52
53    /// Polls the next chunk from the file.
54    /// 
55    /// # Arguments
56    /// * `self` - The `StreamAdapter` struct.
57    /// * `cx` - The context of the task to enable the task to be woken up and polled again using the waker.
58    /// 
59    /// # Returns
60    /// A poll containing the next chunk from the file.
61    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62        let mut buffer = vec![0u8; self.chunk_size];
63        let bytes_read = self.file_pointer.read(&mut buffer)?;
64
65        buffer.truncate(bytes_read);
66        if buffer.is_empty() {
67            return Poll::Ready(None);
68        }
69        Poll::Ready(Some(Ok(buffer.into())))
70    }
71}