Skip to main content

gibblox_file/
lib.rs

1use std::fs::File;
2#[cfg(target_family = "unix")]
3use std::os::unix::fs::FileExt;
4#[cfg(target_family = "windows")]
5use std::os::windows::fs::FileExt;
6use std::path::{Path, PathBuf};
7
8use async_trait::async_trait;
9use gibblox_core::{
10    BlockByteReader, BlockReader, BlockReaderConfigIdentity, ByteReader, GibbloxError,
11    GibbloxErrorKind, GibbloxResult, ReadContext,
12};
13use tracing::{debug, trace};
14
15#[derive(Clone, Debug)]
16pub struct FileReaderConfig {
17    pub path: Option<PathBuf>,
18    pub block_size: u32,
19    pub identity_path: String,
20}
21
22impl FileReaderConfig {
23    pub fn new(path: impl AsRef<Path>, block_size: u32) -> GibbloxResult<Self> {
24        validate_block_size(block_size)?;
25        let path = path.as_ref();
26        let canonical = std::fs::canonicalize(path).unwrap_or_else(|_| PathBuf::from(path));
27        Ok(Self {
28            path: Some(path.to_path_buf()),
29            block_size,
30            identity_path: canonical.to_string_lossy().into_owned(),
31        })
32    }
33
34    pub fn with_identity_path(identity_path: String, block_size: u32) -> GibbloxResult<Self> {
35        validate_block_size(block_size)?;
36        Ok(Self {
37            path: None,
38            block_size,
39            identity_path,
40        })
41    }
42}
43
44impl BlockReaderConfigIdentity for FileReaderConfig {
45    fn write_identity(&self, out: &mut dyn std::fmt::Write) -> std::fmt::Result {
46        write!(out, "file:{}", self.identity_path)
47    }
48}
49
50/// Simple block-aligned source wrapper over `std::fs::File`.
51pub struct FileReader {
52    file: File,
53    size_bytes: u64,
54    config: FileReaderConfig,
55}
56
57impl FileReader {
58    pub fn open(path: impl AsRef<Path>, block_size: u32) -> GibbloxResult<Self> {
59        Self::open_with_config(FileReaderConfig::new(path, block_size)?)
60    }
61
62    pub fn open_with_config(config: FileReaderConfig) -> GibbloxResult<Self> {
63        validate_block_size(config.block_size)?;
64        let path = config.path.as_ref().ok_or_else(|| {
65            GibbloxError::with_message(
66                GibbloxErrorKind::InvalidInput,
67                "file reader config missing path",
68            )
69        })?;
70        debug!(path = %path.display(), block_size = config.block_size, "opening file-backed source");
71        let file = File::open(path).map_err(map_io_err("open file"))?;
72        Self::from_file_with_config(file, config)
73    }
74
75    pub fn from_file(file: File, block_size: u32) -> GibbloxResult<Self> {
76        Self::from_file_with_config(
77            file,
78            FileReaderConfig::with_identity_path(String::from("<unknown>"), block_size)?,
79        )
80    }
81
82    pub fn from_file_with_identity(
83        file: File,
84        block_size: u32,
85        identity_path: String,
86    ) -> GibbloxResult<Self> {
87        Self::from_file_with_config(
88            file,
89            FileReaderConfig::with_identity_path(identity_path, block_size)?,
90        )
91    }
92
93    pub fn from_file_with_config(file: File, config: FileReaderConfig) -> GibbloxResult<Self> {
94        validate_block_size(config.block_size)?;
95        let size_bytes = file.metadata().map_err(map_io_err("stat file"))?.len();
96        debug!(
97            size_bytes,
98            block_size = config.block_size,
99            "initialized file-backed source"
100        );
101        Ok(Self {
102            file,
103            size_bytes,
104            config,
105        })
106    }
107
108    pub fn config(&self) -> &FileReaderConfig {
109        &self.config
110    }
111
112    pub fn size_bytes(&self) -> u64 {
113        self.size_bytes
114    }
115}
116
117#[async_trait]
118impl ByteReader for FileReader {
119    async fn size_bytes(&self) -> GibbloxResult<u64> {
120        Ok(self.size_bytes)
121    }
122
123    fn write_identity(&self, out: &mut dyn std::fmt::Write) -> std::fmt::Result {
124        self.config.write_identity(out)
125    }
126
127    async fn read_at(
128        &self,
129        offset: u64,
130        buf: &mut [u8],
131        _ctx: ReadContext,
132    ) -> GibbloxResult<usize> {
133        if buf.is_empty() {
134            return Ok(0);
135        }
136        if offset >= self.size_bytes {
137            return Ok(0);
138        }
139
140        let read_len = (buf.len() as u64).min(self.size_bytes - offset) as usize;
141        let read = read_file_at_full(&self.file, &mut buf[..read_len], offset)
142            .map_err(map_io_err("read file"))?;
143
144        trace!(
145            offset,
146            requested = read_len,
147            read,
148            "performed file byte read"
149        );
150        Ok(read)
151    }
152}
153
154#[async_trait]
155impl BlockReader for FileReader {
156    fn block_size(&self) -> u32 {
157        self.config.block_size
158    }
159
160    async fn total_blocks(&self) -> GibbloxResult<u64> {
161        Ok(self.size_bytes.div_ceil(self.config.block_size as u64))
162    }
163
164    fn write_identity(&self, out: &mut dyn std::fmt::Write) -> std::fmt::Result {
165        self.config.write_identity(out)
166    }
167
168    async fn read_blocks(
169        &self,
170        lba: u64,
171        buf: &mut [u8],
172        ctx: ReadContext,
173    ) -> GibbloxResult<usize> {
174        let adapter = BlockByteReader::new(self, self.config.block_size)?;
175        let read = adapter.read_blocks(lba, buf, ctx).await?;
176        trace!(
177            lba,
178            requested = buf.len(),
179            read,
180            "performed file block read"
181        );
182        Ok(read)
183    }
184}
185
186fn validate_block_size(block_size: u32) -> GibbloxResult<()> {
187    if block_size == 0 || !block_size.is_power_of_two() {
188        return Err(GibbloxError::with_message(
189            GibbloxErrorKind::InvalidInput,
190            "block size must be non-zero power of two",
191        ));
192    }
193    Ok(())
194}
195
196fn read_file_at_full(file: &File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
197    read_at_full(buf, offset, |chunk, chunk_offset| {
198        read_file_at(file, chunk, chunk_offset)
199    })
200}
201
202fn read_at_full<F>(buf: &mut [u8], offset: u64, mut read_at: F) -> std::io::Result<usize>
203where
204    F: FnMut(&mut [u8], u64) -> std::io::Result<usize>,
205{
206    if buf.is_empty() {
207        return Ok(0);
208    }
209
210    let mut filled = 0usize;
211    while filled < buf.len() {
212        let chunk_offset = offset.checked_add(filled as u64).ok_or_else(|| {
213            std::io::Error::new(
214                std::io::ErrorKind::InvalidInput,
215                "file read offset overflow",
216            )
217        })?;
218        let remaining = buf.len() - filled;
219        let read = read_at(&mut buf[filled..], chunk_offset)?;
220        if read > remaining {
221            return Err(std::io::Error::new(
222                std::io::ErrorKind::InvalidData,
223                format!("file read returned too many bytes: {read} > {remaining}"),
224            ));
225        }
226        if read == 0 {
227            return Err(std::io::Error::new(
228                std::io::ErrorKind::UnexpectedEof,
229                format!(
230                    "short file read: expected {} bytes, got {filled}",
231                    buf.len()
232                ),
233            ));
234        }
235        filled = filled.checked_add(read).ok_or_else(|| {
236            std::io::Error::new(
237                std::io::ErrorKind::InvalidInput,
238                "file read length overflow",
239            )
240        })?;
241    }
242    Ok(filled)
243}
244
245#[cfg(target_family = "unix")]
246fn read_file_at(file: &File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
247    file.read_at(buf, offset)
248}
249
250#[cfg(target_family = "windows")]
251fn read_file_at(file: &File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
252    file.seek_read(buf, offset)
253}
254
255#[cfg(not(any(target_family = "unix", target_family = "windows")))]
256fn read_file_at(_file: &File, _buf: &mut [u8], _offset: u64) -> std::io::Result<usize> {
257    Err(std::io::Error::other(
258        "FileReader is unsupported on this target",
259    ))
260}
261
262fn map_io_err(op: &'static str) -> impl FnOnce(std::io::Error) -> GibbloxError {
263    move |err| GibbloxError::with_message(GibbloxErrorKind::Io, format!("{op}: {err}"))
264}
265
266#[cfg(test)]
267mod tests {
268    use super::read_at_full;
269
270    #[test]
271    fn read_at_full_retries_short_reads() {
272        let source = b"abcdefghijklmnopqrstuvwxyz";
273        let mut out = [0u8; 12];
274
275        let read = read_at_full(&mut out, 5, |buf, offset| {
276            let start = offset as usize;
277            let chunk = 3usize.min(buf.len());
278            let end = start + chunk;
279            buf[..chunk].copy_from_slice(&source[start..end]);
280            Ok(chunk)
281        })
282        .expect("short reads should be retried until complete");
283
284        assert_eq!(read, out.len());
285        assert_eq!(&out, b"fghijklmnopq");
286    }
287
288    #[test]
289    fn read_at_full_errors_when_progress_stalls() {
290        let mut out = [0u8; 8];
291        let mut calls = 0usize;
292
293        let err = read_at_full(&mut out, 0, |_buf, _offset| {
294            calls += 1;
295            if calls == 1 {
296                return Ok(3);
297            }
298            Ok(0)
299        })
300        .expect_err("zero-length follow-up read should fail");
301
302        assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
303    }
304}