timeseries_table_core/storage/
output.rs1use std::{
2 io::{self, Write},
3 path::{Path, PathBuf},
4};
5
6use snafu::{IntoError, ResultExt};
7use tokio::fs;
8
9use crate::storage::{
10 BackendError, OtherIoSnafu, StorageLocation, StorageResult, TempFileGuard, create_parent_dir,
11 join_local,
12};
13
14struct LocalSink {
16 tmp_path: PathBuf,
17 final_path: PathBuf,
18 writer: io::BufWriter<std::fs::File>,
19 guard: TempFileGuard,
20}
21
22impl LocalSink {
23 async fn open(location: &StorageLocation, rel_path: &Path) -> StorageResult<Self> {
24 let final_path = join_local(location, rel_path);
25 create_parent_dir(&final_path).await?;
26
27 let tmp_path = final_path.with_extension("tmp");
28
29 let file = std::fs::File::create(&tmp_path)
31 .map_err(BackendError::Local)
32 .context(OtherIoSnafu {
33 path: tmp_path.display().to_string(),
34 })?;
35
36 let writer = io::BufWriter::new(file);
37 let guard = TempFileGuard::new(tmp_path.clone());
38
39 Ok(Self {
40 tmp_path,
41 final_path,
42 writer,
43 guard,
44 })
45 }
46
47 fn writer(&mut self) -> &mut dyn Write {
48 &mut self.writer
49 }
50
51 async fn finish(&mut self) -> StorageResult<()> {
52 self.writer
53 .flush()
54 .map_err(BackendError::Local)
55 .context(OtherIoSnafu {
56 path: self.tmp_path.display().to_string(),
57 })?;
58
59 self.writer
60 .get_ref()
61 .sync_all()
62 .map_err(BackendError::Local)
63 .context(OtherIoSnafu {
64 path: self.tmp_path.display().to_string(),
65 })?;
66
67 fs::rename(&self.tmp_path, &self.final_path)
68 .await
69 .map_err(BackendError::Local)
70 .context(OtherIoSnafu {
71 path: self.final_path.display().to_string(),
72 })?;
73
74 self.guard.disarm();
75 Ok(())
76 }
77}
78
79enum OutputSinkInner {
80 Local(LocalSink),
81 }
83
84pub struct OutputSink {
91 inner: OutputSinkInner,
92}
93
94impl OutputSink {
95 pub fn writer(&mut self) -> &mut dyn Write {
97 match &mut self.inner {
98 OutputSinkInner::Local(s) => s.writer(),
99 }
100 }
101
102 pub async fn finish(self) -> StorageResult<()> {
104 match self.inner {
105 OutputSinkInner::Local(mut s) => s.finish().await,
106 }
107 }
108}
109
110pub async fn open_output_sink(
118 location: &StorageLocation,
119 rel_path: &Path,
120) -> StorageResult<OutputSink> {
121 match location {
122 StorageLocation::Local(_) => {
123 let sink = LocalSink::open(location, rel_path).await?;
124 Ok(OutputSink {
125 inner: OutputSinkInner::Local(sink),
126 })
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
133pub struct OutputLocation {
134 pub storage: StorageLocation,
136 pub rel_path: PathBuf,
138}
139
140impl OutputLocation {
141 pub fn parse(spec: &str) -> StorageResult<OutputLocation> {
143 let trimmed = spec.trim();
144 if trimmed.is_empty() {
145 return Err(OtherIoSnafu {
146 path: "<empty output location>".to_string(),
147 }
148 .into_error(BackendError::Local(std::io::Error::new(
149 io::ErrorKind::InvalidInput,
150 "output location is empty",
151 ))));
152 }
153
154 let storage = StorageLocation::parse(trimmed)?;
155
156 match &storage {
157 StorageLocation::Local(_) => {
158 let path = PathBuf::from(trimmed);
159
160 let base = PathBuf::from(".");
161 let rel_path = path;
162
163 Ok(OutputLocation {
164 storage: StorageLocation::Local(base),
165 rel_path,
166 })
167 }
168 }
169 }
170}