1use std::fs::File;
2#[cfg(target_family = "unix")]
3use std::os::unix::fs::FileExt;
4#[cfg(target_family = "windows")]
5use std::os::windows::fs::FileExt;
6use std::path::{Path, PathBuf};
7
8use async_trait::async_trait;
9use gibblox_core::{
10 BlockByteReader, BlockReader, BlockReaderConfigIdentity, ByteReader, GibbloxError,
11 GibbloxErrorKind, GibbloxResult, ReadContext,
12};
13use tracing::{debug, trace};
14
15#[derive(Clone, Debug)]
16pub struct FileReaderConfig {
17 pub path: Option<PathBuf>,
18 pub block_size: u32,
19 pub identity_path: String,
20}
21
22impl FileReaderConfig {
23 pub fn new(path: impl AsRef<Path>, block_size: u32) -> GibbloxResult<Self> {
24 validate_block_size(block_size)?;
25 let path = path.as_ref();
26 let canonical = std::fs::canonicalize(path).unwrap_or_else(|_| PathBuf::from(path));
27 Ok(Self {
28 path: Some(path.to_path_buf()),
29 block_size,
30 identity_path: canonical.to_string_lossy().into_owned(),
31 })
32 }
33
34 pub fn with_identity_path(identity_path: String, block_size: u32) -> GibbloxResult<Self> {
35 validate_block_size(block_size)?;
36 Ok(Self {
37 path: None,
38 block_size,
39 identity_path,
40 })
41 }
42}
43
44impl BlockReaderConfigIdentity for FileReaderConfig {
45 fn write_identity(&self, out: &mut dyn std::fmt::Write) -> std::fmt::Result {
46 write!(out, "file:{}", self.identity_path)
47 }
48}
49
50pub struct FileReader {
52 file: File,
53 size_bytes: u64,
54 config: FileReaderConfig,
55}
56
57impl FileReader {
58 pub fn open(path: impl AsRef<Path>, block_size: u32) -> GibbloxResult<Self> {
59 Self::open_with_config(FileReaderConfig::new(path, block_size)?)
60 }
61
62 pub fn open_with_config(config: FileReaderConfig) -> GibbloxResult<Self> {
63 validate_block_size(config.block_size)?;
64 let path = config.path.as_ref().ok_or_else(|| {
65 GibbloxError::with_message(
66 GibbloxErrorKind::InvalidInput,
67 "file reader config missing path",
68 )
69 })?;
70 debug!(path = %path.display(), block_size = config.block_size, "opening file-backed source");
71 let file = File::open(path).map_err(map_io_err("open file"))?;
72 Self::from_file_with_config(file, config)
73 }
74
75 pub fn from_file(file: File, block_size: u32) -> GibbloxResult<Self> {
76 Self::from_file_with_config(
77 file,
78 FileReaderConfig::with_identity_path(String::from("<unknown>"), block_size)?,
79 )
80 }
81
82 pub fn from_file_with_identity(
83 file: File,
84 block_size: u32,
85 identity_path: String,
86 ) -> GibbloxResult<Self> {
87 Self::from_file_with_config(
88 file,
89 FileReaderConfig::with_identity_path(identity_path, block_size)?,
90 )
91 }
92
93 pub fn from_file_with_config(file: File, config: FileReaderConfig) -> GibbloxResult<Self> {
94 validate_block_size(config.block_size)?;
95 let size_bytes = file.metadata().map_err(map_io_err("stat file"))?.len();
96 debug!(
97 size_bytes,
98 block_size = config.block_size,
99 "initialized file-backed source"
100 );
101 Ok(Self {
102 file,
103 size_bytes,
104 config,
105 })
106 }
107
108 pub fn config(&self) -> &FileReaderConfig {
109 &self.config
110 }
111
112 pub fn size_bytes(&self) -> u64 {
113 self.size_bytes
114 }
115}
116
117#[async_trait]
118impl ByteReader for FileReader {
119 async fn size_bytes(&self) -> GibbloxResult<u64> {
120 Ok(self.size_bytes)
121 }
122
123 fn write_identity(&self, out: &mut dyn std::fmt::Write) -> std::fmt::Result {
124 self.config.write_identity(out)
125 }
126
127 async fn read_at(
128 &self,
129 offset: u64,
130 buf: &mut [u8],
131 _ctx: ReadContext,
132 ) -> GibbloxResult<usize> {
133 if buf.is_empty() {
134 return Ok(0);
135 }
136 if offset >= self.size_bytes {
137 return Ok(0);
138 }
139
140 let read_len = (buf.len() as u64).min(self.size_bytes - offset) as usize;
141 let read = read_file_at_full(&self.file, &mut buf[..read_len], offset)
142 .map_err(map_io_err("read file"))?;
143
144 trace!(
145 offset,
146 requested = read_len,
147 read,
148 "performed file byte read"
149 );
150 Ok(read)
151 }
152}
153
154#[async_trait]
155impl BlockReader for FileReader {
156 fn block_size(&self) -> u32 {
157 self.config.block_size
158 }
159
160 async fn total_blocks(&self) -> GibbloxResult<u64> {
161 Ok(self.size_bytes.div_ceil(self.config.block_size as u64))
162 }
163
164 fn write_identity(&self, out: &mut dyn std::fmt::Write) -> std::fmt::Result {
165 self.config.write_identity(out)
166 }
167
168 async fn read_blocks(
169 &self,
170 lba: u64,
171 buf: &mut [u8],
172 ctx: ReadContext,
173 ) -> GibbloxResult<usize> {
174 let adapter = BlockByteReader::new(self, self.config.block_size)?;
175 let read = adapter.read_blocks(lba, buf, ctx).await?;
176 trace!(
177 lba,
178 requested = buf.len(),
179 read,
180 "performed file block read"
181 );
182 Ok(read)
183 }
184}
185
186fn validate_block_size(block_size: u32) -> GibbloxResult<()> {
187 if block_size == 0 || !block_size.is_power_of_two() {
188 return Err(GibbloxError::with_message(
189 GibbloxErrorKind::InvalidInput,
190 "block size must be non-zero power of two",
191 ));
192 }
193 Ok(())
194}
195
196fn read_file_at_full(file: &File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
197 read_at_full(buf, offset, |chunk, chunk_offset| {
198 read_file_at(file, chunk, chunk_offset)
199 })
200}
201
202fn read_at_full<F>(buf: &mut [u8], offset: u64, mut read_at: F) -> std::io::Result<usize>
203where
204 F: FnMut(&mut [u8], u64) -> std::io::Result<usize>,
205{
206 if buf.is_empty() {
207 return Ok(0);
208 }
209
210 let mut filled = 0usize;
211 while filled < buf.len() {
212 let chunk_offset = offset.checked_add(filled as u64).ok_or_else(|| {
213 std::io::Error::new(
214 std::io::ErrorKind::InvalidInput,
215 "file read offset overflow",
216 )
217 })?;
218 let remaining = buf.len() - filled;
219 let read = read_at(&mut buf[filled..], chunk_offset)?;
220 if read > remaining {
221 return Err(std::io::Error::new(
222 std::io::ErrorKind::InvalidData,
223 format!("file read returned too many bytes: {read} > {remaining}"),
224 ));
225 }
226 if read == 0 {
227 return Err(std::io::Error::new(
228 std::io::ErrorKind::UnexpectedEof,
229 format!(
230 "short file read: expected {} bytes, got {filled}",
231 buf.len()
232 ),
233 ));
234 }
235 filled = filled.checked_add(read).ok_or_else(|| {
236 std::io::Error::new(
237 std::io::ErrorKind::InvalidInput,
238 "file read length overflow",
239 )
240 })?;
241 }
242 Ok(filled)
243}
244
245#[cfg(target_family = "unix")]
246fn read_file_at(file: &File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
247 file.read_at(buf, offset)
248}
249
250#[cfg(target_family = "windows")]
251fn read_file_at(file: &File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
252 file.seek_read(buf, offset)
253}
254
255#[cfg(not(any(target_family = "unix", target_family = "windows")))]
256fn read_file_at(_file: &File, _buf: &mut [u8], _offset: u64) -> std::io::Result<usize> {
257 Err(std::io::Error::other(
258 "FileReader is unsupported on this target",
259 ))
260}
261
262fn map_io_err(op: &'static str) -> impl FnOnce(std::io::Error) -> GibbloxError {
263 move |err| GibbloxError::with_message(GibbloxErrorKind::Io, format!("{op}: {err}"))
264}
265
266#[cfg(test)]
267mod tests {
268 use super::read_at_full;
269
270 #[test]
271 fn read_at_full_retries_short_reads() {
272 let source = b"abcdefghijklmnopqrstuvwxyz";
273 let mut out = [0u8; 12];
274
275 let read = read_at_full(&mut out, 5, |buf, offset| {
276 let start = offset as usize;
277 let chunk = 3usize.min(buf.len());
278 let end = start + chunk;
279 buf[..chunk].copy_from_slice(&source[start..end]);
280 Ok(chunk)
281 })
282 .expect("short reads should be retried until complete");
283
284 assert_eq!(read, out.len());
285 assert_eq!(&out, b"fghijklmnopq");
286 }
287
288 #[test]
289 fn read_at_full_errors_when_progress_stalls() {
290 let mut out = [0u8; 8];
291 let mut calls = 0usize;
292
293 let err = read_at_full(&mut out, 0, |_buf, _offset| {
294 calls += 1;
295 if calls == 1 {
296 return Ok(3);
297 }
298 Ok(0)
299 })
300 .expect_err("zero-length follow-up read should fail");
301
302 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
303 }
304}