Skip to main content

timeseries_table_core/storage/
io.rs

1use snafu::{Backtrace, prelude::*};
2use std::{
3    io::{self, SeekFrom},
4    path::{Path, PathBuf},
5};
6use tokio::{
7    fs::{self, OpenOptions},
8    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
9};
10
11use crate::storage::{
12    BackendError, NotFoundSnafu, OtherIoSnafu, StorageError, StorageLocation, StorageResult,
13};
14
15/// Guard that removes a temporary file on drop unless disarmed.
16/// Used to ensure cleanup on error paths during atomic writes.
17pub(super) struct TempFileGuard {
18    path: PathBuf,
19    armed: bool,
20}
21
22impl TempFileGuard {
23    pub(super) fn new(path: PathBuf) -> Self {
24        Self { path, armed: true }
25    }
26
27    /// Disarm the guard so the file is NOT removed on drop.
28    /// Call this after a successful rename.
29    pub(super) fn disarm(&mut self) {
30        self.armed = false;
31    }
32}
33
34impl Drop for TempFileGuard {
35    fn drop(&mut self) {
36        if self.armed {
37            // Best-effort cleanup; ignore errors since we're likely already handling another error.
38            let _ = std::fs::remove_file(&self.path);
39        }
40    }
41}
42
43/// Join a table location with a relative path into an absolute local path.
44///
45/// v0.1: only Local is supported.
46pub(super) fn join_local(location: &StorageLocation, rel: &Path) -> PathBuf {
47    match location {
48        StorageLocation::Local(root) => root.join(rel),
49    }
50}
51
52pub(super) async fn create_parent_dir(abs: &Path) -> StorageResult<()> {
53    if let Some(parent) = abs.parent() {
54        fs::create_dir_all(parent)
55            .await
56            .map_err(BackendError::Local)
57            .context(OtherIoSnafu {
58                path: parent.display().to_string(),
59            })?;
60    }
61    Ok(())
62}
63
64/// Write `contents` to `rel_path` inside `location` using an atomic write.
65///
66/// This performs a write-then-rename sequence on the local filesystem:
67/// it writes the payload to a temporary file next to the target path,
68/// syncs the file, and then renames it into place to provide an atomic
69/// replacement. Currently only `StorageLocation::Local` is supported.
70///
71/// # Parameters
72///
73/// - `location`: the table root location to resolve the relative path.
74/// - `rel_path`: the relative path (under `location`) to write the file to.
75/// - `contents`: the bytes to write.
76///
77/// # Errors
78///
79/// Returns `StorageError::LocalIo` when filesystem I/O fails; other internal
80/// helpers may add context to the returned error.
81pub async fn write_atomic(
82    location: &StorageLocation,
83    rel_path: &Path,
84    contents: &[u8],
85) -> StorageResult<()> {
86    match location {
87        StorageLocation::Local(_) => {
88            let abs = join_local(location, rel_path);
89
90            create_parent_dir(&abs).await?;
91
92            let tmp_path = abs.with_extension("tmp");
93            let mut guard = TempFileGuard::new(tmp_path.clone());
94
95            {
96                let mut file = fs::File::create(&tmp_path)
97                    .await
98                    .map_err(BackendError::Local)
99                    .context(OtherIoSnafu {
100                        path: tmp_path.display().to_string(),
101                    })?;
102
103                file.write_all(contents)
104                    .await
105                    .map_err(BackendError::Local)
106                    .context(OtherIoSnafu {
107                        path: tmp_path.display().to_string(),
108                    })?;
109
110                file.sync_all()
111                    .await
112                    .map_err(BackendError::Local)
113                    .context(OtherIoSnafu {
114                        path: tmp_path.display().to_string(),
115                    })?;
116            }
117
118            fs::rename(&tmp_path, &abs)
119                .await
120                .map_err(BackendError::Local)
121                .context(OtherIoSnafu {
122                    path: abs.display().to_string(),
123                })?;
124
125            // Success - don't remove the temp file (it's been renamed)
126            guard.disarm();
127
128            Ok(())
129        }
130    }
131}
132
133/// Read the file at `rel_path` within the given `location` and return its
134/// contents as a `String`.
135///
136/// Currently only `StorageLocation::Local` is supported. On success this returns
137/// the file contents; if the file cannot be found a `StorageError::NotFound` is
138/// returned, while other filesystem problems produce `StorageError::LocalIo`.
139pub async fn read_to_string(location: &StorageLocation, rel_path: &Path) -> StorageResult<String> {
140    match location {
141        StorageLocation::Local(_) => {
142            let abs = join_local(location, rel_path);
143
144            match fs::read_to_string(&abs).await {
145                Ok(s) => Ok(s),
146                Err(e) if e.kind() == io::ErrorKind::NotFound => Err(BackendError::Local(e))
147                    .context(NotFoundSnafu {
148                        path: abs.display().to_string(),
149                    }),
150                Err(e) => Err(BackendError::Local(e)).context(OtherIoSnafu {
151                    path: abs.display().to_string(),
152                }),
153            }
154        }
155    }
156}
157
158/// Create a *new* file at `rel_path` and write `contents`, failing if the file
159/// already exists.
160///
161/// This is used for commit files where we want per-version uniqueness.
162pub async fn write_new(
163    location: &StorageLocation,
164    rel_path: &Path,
165    contents: &[u8],
166) -> StorageResult<()> {
167    match location {
168        StorageLocation::Local(_) => {
169            let abs = join_local(location, rel_path);
170            create_parent_dir(&abs).await?;
171
172            let path_str = abs.display().to_string();
173
174            // Atomic "create only if not exists" on the target path.
175            let open_result = OpenOptions::new()
176                .write(true)
177                .create_new(true)
178                .open(&abs)
179                .await;
180
181            let mut file = match open_result {
182                Ok(f) => f,
183                Err(e) => {
184                    let backend = BackendError::Local(e);
185                    // Classify AlreadyExists vs "other I/O"
186                    let storage_err = match &backend {
187                        BackendError::Local(inner)
188                            if inner.kind() == io::ErrorKind::AlreadyExists =>
189                        {
190                            StorageError::AlreadyExists {
191                                path: path_str,
192                                source: backend,
193                                backtrace: Backtrace::capture(),
194                            }
195                        }
196                        _ => StorageError::OtherIo {
197                            path: path_str,
198                            source: backend,
199                            backtrace: Backtrace::capture(),
200                        },
201                    };
202                    return Err(storage_err);
203                }
204            };
205
206            file.write_all(contents)
207                .await
208                .map_err(BackendError::Local)
209                .context(OtherIoSnafu {
210                    path: abs.display().to_string(),
211                })?;
212
213            file.sync_all()
214                .await
215                .map_err(BackendError::Local)
216                .context(OtherIoSnafu {
217                    path: abs.display().to_string(),
218                })?;
219
220            Ok(())
221        }
222    }
223}
224
225/// Small probe structure used by higher-level code (e.g. segment validators)
226/// to inspect a file's length and its first/last 4 bytes.
227pub struct FileHeadTail4 {
228    /// Length of the file in bytes.
229    pub len: u64,
230    /// First 4 bytes of the file (zero-filled if the file is shorter).
231    pub head: [u8; 4],
232    /// Last 4 bytes of the file (zero-filled if the file is shorter).
233    pub tail: [u8; 4],
234}
235
236/// Read the length, first 4 bytes, and last 4 bytes of a file at `rel_path`
237/// within the given `location`.
238///
239/// Semantics:
240/// - On missing file: `StorageError::NotFound`.
241/// - On other I/O problems: `StorageError::LocalIo`.
242/// - Only `StorageLocation::Local` is supported in v0.1.
243///
244/// For files shorter than 4 bytes, both `head` and `tail` remain zero-filled.
245/// For files between 4 and 7 bytes, `head` contains the first 4 bytes but
246/// `tail` remains zero-filled since reading both without overlap is not
247/// possible. Callers that need distinct head/tail (e.g., Parquet magic
248/// validation) should check `len >= 8` before inspecting `tail`.
249pub async fn read_head_tail_4(
250    location: &StorageLocation,
251    rel_path: &Path,
252) -> StorageResult<FileHeadTail4> {
253    match location {
254        StorageLocation::Local(_) => {
255            let abs = join_local(location, rel_path);
256            let path_str = abs.display().to_string();
257
258            // Metadata: we special-case NotFound like read_to_string does.
259            let meta = match fs::metadata(&abs).await {
260                Ok(m) => m,
261                Err(e) if e.kind() == io::ErrorKind::NotFound => {
262                    return Err(BackendError::Local(e)).context(NotFoundSnafu { path: path_str });
263                }
264                Err(e) => {
265                    return Err(BackendError::Local(e)).context(OtherIoSnafu { path: path_str });
266                }
267            };
268
269            // 2) Non-regular file: treat as semantic "NotFound" (no real OS error).
270            if !meta.is_file() {
271                let synthetic = io::Error::other("not a regular file");
272                let backend = BackendError::Local(synthetic);
273                return Err(StorageError::NotFound {
274                    path: path_str,
275                    source: backend,
276                    backtrace: Backtrace::capture(),
277                });
278            }
279
280            let len = meta.len();
281
282            let mut file = fs::File::open(&abs)
283                .await
284                .map_err(BackendError::Local)
285                .context(OtherIoSnafu {
286                    path: path_str.clone(),
287                })?;
288
289            let mut head = [0u8; 4];
290            let mut tail = [0u8; 4];
291
292            // Only attempt to read the header if file is at least 4 bytes.
293            if len >= 4 {
294                file.read_exact(&mut head)
295                    .await
296                    .map_err(BackendError::Local)
297                    .context(OtherIoSnafu {
298                        path: path_str.clone(),
299                    })?;
300            }
301
302            // Only attempt to read the footer if file is at least 8 bytes.
303            if len >= 8 {
304                file.seek(SeekFrom::End(-4))
305                    .await
306                    .map_err(BackendError::Local)
307                    .context(OtherIoSnafu {
308                        path: path_str.clone(),
309                    })?;
310                file.read_exact(&mut tail)
311                    .await
312                    .map_err(BackendError::Local)
313                    .context(OtherIoSnafu {
314                        path: path_str.clone(),
315                    })?;
316            }
317            Ok(FileHeadTail4 { len, head, tail })
318        }
319    }
320}
321
322/// Read the full contents of a file at `rel_path` within `location` and return
323/// them as a Vec<u8>.
324///
325/// Only `StorageLocation::Local` is supported in this crate version.
326///
327/// Errors:
328/// - If the file does not exist this returns `StorageError::NotFound`.
329/// - On any other I/O error this returns `StorageError::OtherIo`.
330pub async fn read_all_bytes(location: &StorageLocation, rel_path: &Path) -> StorageResult<Vec<u8>> {
331    match location {
332        StorageLocation::Local(_) => {
333            let abs = join_local(location, rel_path);
334            let path_str = abs.display().to_string();
335
336            match fs::read(&abs).await {
337                Ok(bytes) => Ok(bytes),
338                Err(e) if e.kind() == io::ErrorKind::NotFound => {
339                    Err(BackendError::Local(e)).context(NotFoundSnafu { path: path_str })
340                }
341                Err(e) => Err(BackendError::Local(e)).context(OtherIoSnafu { path: path_str }),
342            }
343        }
344    }
345}
346
347/// Get the length (in bytes) of a file at `rel_path` within `location`.
348///
349/// v0.1: only StorageLocation::Local is supported.
350pub async fn file_size(location: &StorageLocation, rel_path: &Path) -> StorageResult<u64> {
351    match location {
352        StorageLocation::Local(_) => {
353            let abs = join_local(location, rel_path);
354            let path_str = abs.display().to_string();
355
356            let meta = fs::metadata(&abs).await;
357            match meta {
358                Ok(m) => Ok(m.len()),
359                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
360                    Err(BackendError::Local(e)).context(NotFoundSnafu { path: path_str })
361                }
362                Err(e) => Err(BackendError::Local(e)).context(OtherIoSnafu { path: path_str }),
363            }
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370
371    use super::*;
372    use tempfile::TempDir;
373
374    type TestResult = Result<(), Box<dyn std::error::Error>>;
375
376    #[tokio::test]
377    async fn write_atomic_creates_file_with_contents() -> TestResult {
378        let tmp = TempDir::new()?;
379        let location = StorageLocation::local(tmp.path());
380
381        let rel_path = Path::new("test.txt");
382        let contents = b"hello world";
383
384        write_atomic(&location, rel_path, contents).await?;
385
386        // Verify file exists and has correct contents.
387        let abs = tmp.path().join(rel_path);
388        let read_back = tokio::fs::read_to_string(&abs).await?;
389        assert_eq!(read_back, "hello world");
390        Ok(())
391    }
392
393    #[tokio::test]
394    async fn write_atomic_creates_parent_directories() -> TestResult {
395        let tmp = TempDir::new()?;
396        let location = StorageLocation::local(tmp.path());
397
398        let rel_path = Path::new("nested/deep/dir/file.txt");
399        let contents = b"nested content";
400
401        write_atomic(&location, rel_path, contents).await?;
402
403        let abs = tmp.path().join(rel_path);
404        assert!(abs.exists());
405        let read_back = tokio::fs::read_to_string(&abs).await?;
406        assert_eq!(read_back, "nested content");
407        Ok(())
408    }
409
410    #[tokio::test]
411    async fn write_atomic_overwrites_existing_file() -> TestResult {
412        let tmp = TempDir::new()?;
413        let location = StorageLocation::local(tmp.path());
414        let rel_path = Path::new("overwrite.txt");
415
416        // Write initial content.
417        write_atomic(&location, rel_path, b"original").await?;
418
419        // Overwrite with new content.
420        write_atomic(&location, rel_path, b"updated").await?;
421
422        let abs = tmp.path().join(rel_path);
423        let read_back = tokio::fs::read_to_string(&abs).await?;
424        assert_eq!(read_back, "updated");
425        Ok(())
426    }
427
428    #[tokio::test]
429    async fn write_atomic_no_leftover_tmp_file() -> TestResult {
430        let tmp = TempDir::new()?;
431        let location = StorageLocation::local(tmp.path());
432        let rel_path = Path::new("clean.txt");
433
434        write_atomic(&location, rel_path, b"data").await?;
435
436        // The .tmp file should not remain after successful write.
437        let tmp_path = tmp.path().join("clean.tmp");
438        assert!(!tmp_path.exists());
439        Ok(())
440    }
441
442    #[tokio::test]
443    async fn read_to_string_returns_file_contents() -> TestResult {
444        let tmp = TempDir::new()?;
445        let location = StorageLocation::local(tmp.path());
446        let rel_path = Path::new("readable.txt");
447
448        // Create a file directly.
449        let abs = tmp.path().join(rel_path);
450        tokio::fs::write(&abs, "file contents").await?;
451
452        let result = read_to_string(&location, rel_path).await?;
453        assert_eq!(result, "file contents");
454        Ok(())
455    }
456
457    #[tokio::test]
458    async fn read_to_string_returns_not_found_for_missing_file() -> TestResult {
459        let tmp = TempDir::new()?;
460        let location = StorageLocation::local(tmp.path());
461        let rel_path = Path::new("does_not_exist.txt");
462
463        let result = read_to_string(&location, rel_path).await;
464
465        assert!(result.is_err());
466        let err = result.expect_err("expected NotFound error");
467        assert!(matches!(err, StorageError::NotFound { .. }));
468        Ok(())
469    }
470
471    #[tokio::test]
472    async fn write_then_read_roundtrip() -> TestResult {
473        let tmp = TempDir::new()?;
474        let location = StorageLocation::local(tmp.path());
475        let rel_path = Path::new("roundtrip.txt");
476
477        let original = "roundtrip content 🎉";
478        write_atomic(&location, rel_path, original.as_bytes()).await?;
479
480        let read_back = read_to_string(&location, rel_path).await?;
481        assert_eq!(read_back, original);
482        Ok(())
483    }
484
485    #[tokio::test]
486    async fn write_new_creates_file_with_contents() -> TestResult {
487        let tmp = TempDir::new()?;
488        let location = StorageLocation::local(tmp.path());
489        let rel_path = Path::new("new_file.txt");
490
491        write_new(&location, rel_path, b"new content").await?;
492
493        let abs = tmp.path().join(rel_path);
494        let read_back = tokio::fs::read_to_string(&abs).await?;
495        assert_eq!(read_back, "new content");
496        Ok(())
497    }
498
499    #[tokio::test]
500    async fn write_new_fails_if_file_exists() -> TestResult {
501        let tmp = TempDir::new()?;
502        let location = StorageLocation::local(tmp.path());
503        let rel_path = Path::new("existing.txt");
504
505        // Create the file first.
506        write_new(&location, rel_path, b"first").await?;
507
508        // Second write should fail with AlreadyExists.
509        let result = write_new(&location, rel_path, b"second").await;
510
511        assert!(result.is_err());
512        let err = result.expect_err("expected AlreadyExists error");
513        assert!(matches!(err, StorageError::AlreadyExists { .. }));
514
515        // Original content should be unchanged.
516        let read_back = read_to_string(&location, rel_path).await?;
517        assert_eq!(read_back, "first");
518        Ok(())
519    }
520
521    #[tokio::test]
522    async fn write_new_creates_parent_directories() -> TestResult {
523        let tmp = TempDir::new()?;
524        let location = StorageLocation::local(tmp.path());
525        let rel_path = Path::new("nested/path/new_file.txt");
526
527        write_new(&location, rel_path, b"nested new").await?;
528
529        let abs = tmp.path().join(rel_path);
530        assert!(abs.exists());
531        let read_back = tokio::fs::read_to_string(&abs).await?;
532        assert_eq!(read_back, "nested new");
533        Ok(())
534    }
535}