Skip to main content

timeseries_table_core/storage/
output.rs

1use std::{
2    io::{self, Write},
3    path::{Path, PathBuf},
4};
5
6use snafu::{IntoError, ResultExt};
7use tokio::fs;
8
9use crate::storage::{
10    BackendError, OtherIoSnafu, StorageLocation, StorageResult, TempFileGuard, create_parent_dir,
11    join_local,
12};
13
14/// Local filesystem sink that writes to a temp file and renames on finish.
15struct LocalSink {
16    tmp_path: PathBuf,
17    final_path: PathBuf,
18    writer: io::BufWriter<std::fs::File>,
19    guard: TempFileGuard,
20}
21
22impl LocalSink {
23    async fn open(location: &StorageLocation, rel_path: &Path) -> StorageResult<Self> {
24        let final_path = join_local(location, rel_path);
25        create_parent_dir(&final_path).await?;
26
27        let tmp_path = final_path.with_extension("tmp");
28
29        // Use std::fs::File because Arrow writers require std::io::Write.
30        let file = std::fs::File::create(&tmp_path)
31            .map_err(BackendError::Local)
32            .context(OtherIoSnafu {
33                path: tmp_path.display().to_string(),
34            })?;
35
36        let writer = io::BufWriter::new(file);
37        let guard = TempFileGuard::new(tmp_path.clone());
38
39        Ok(Self {
40            tmp_path,
41            final_path,
42            writer,
43            guard,
44        })
45    }
46
47    fn writer(&mut self) -> &mut dyn Write {
48        &mut self.writer
49    }
50
51    async fn finish(&mut self) -> StorageResult<()> {
52        self.writer
53            .flush()
54            .map_err(BackendError::Local)
55            .context(OtherIoSnafu {
56                path: self.tmp_path.display().to_string(),
57            })?;
58
59        self.writer
60            .get_ref()
61            .sync_all()
62            .map_err(BackendError::Local)
63            .context(OtherIoSnafu {
64                path: self.tmp_path.display().to_string(),
65            })?;
66
67        fs::rename(&self.tmp_path, &self.final_path)
68            .await
69            .map_err(BackendError::Local)
70            .context(OtherIoSnafu {
71                path: self.final_path.display().to_string(),
72            })?;
73
74        self.guard.disarm();
75        Ok(())
76    }
77}
78
79enum OutputSinkInner {
80    Local(LocalSink),
81    // S3(S3Sink),
82}
83
84/// A streaming output sink for writing bytes to a storage backend.
85///
86/// This type abstracts over backend-specific sink implementations. Callers
87/// obtain a sink via `open_output_sink` and then stream bytes through the
88/// `writer()` handle. Finalization is explicit via `finish()` to allow
89/// backend-specific commit semantics (e.g., atomic rename or multipart upload).
90pub struct OutputSink {
91    inner: OutputSinkInner,
92}
93
94impl OutputSink {
95    /// Return a mutable Write handle for streaming bytes.
96    pub fn writer(&mut self) -> &mut dyn Write {
97        match &mut self.inner {
98            OutputSinkInner::Local(s) => s.writer(),
99        }
100    }
101
102    /// Flush, fsync, and commit to final location.
103    pub async fn finish(self) -> StorageResult<()> {
104        match self.inner {
105            OutputSinkInner::Local(mut s) => s.finish().await,
106        }
107    }
108}
109
110/// Open a streaming output sink at `location` + `rel_path`.
111///
112/// The `location` identifies the backend root, while `rel_path` identifies
113/// the object/key within that backend. For local filesystems this performs a
114/// temp-file write and atomic rename on `finish()`.
115///
116/// v0.1: only StorageLocation::Local is supported.
117pub async fn open_output_sink(
118    location: &StorageLocation,
119    rel_path: &Path,
120) -> StorageResult<OutputSink> {
121    match location {
122        StorageLocation::Local(_) => {
123            let sink = LocalSink::open(location, rel_path).await?;
124            Ok(OutputSink {
125                inner: OutputSinkInner::Local(sink),
126            })
127        }
128    }
129}
130
131/// Fully-qualified output target: backend + relative path/key.
132#[derive(Debug, Clone)]
133pub struct OutputLocation {
134    /// Backend where the output will be written.
135    pub storage: StorageLocation,
136    /// Path within the backend for the output object.
137    pub rel_path: PathBuf,
138}
139
140impl OutputLocation {
141    /// Parse a string specification into an `OutputLocation`, validating it is non-empty and supported.
142    pub fn parse(spec: &str) -> StorageResult<OutputLocation> {
143        let trimmed = spec.trim();
144        if trimmed.is_empty() {
145            return Err(OtherIoSnafu {
146                path: "<empty output location>".to_string(),
147            }
148            .into_error(BackendError::Local(std::io::Error::new(
149                io::ErrorKind::InvalidInput,
150                "output location is empty",
151            ))));
152        }
153
154        let storage = StorageLocation::parse(trimmed)?;
155
156        match &storage {
157            StorageLocation::Local(_) => {
158                let path = PathBuf::from(trimmed);
159
160                let base = PathBuf::from(".");
161                let rel_path = path;
162
163                Ok(OutputLocation {
164                    storage: StorageLocation::Local(base),
165                    rel_path,
166                })
167            }
168        }
169    }
170}