timeseries_table_core/storage/
io.rs1use snafu::{Backtrace, prelude::*};
2use std::{
3 io::{self, SeekFrom},
4 path::{Path, PathBuf},
5};
6use tokio::{
7 fs::{self, OpenOptions},
8 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
9};
10
11use crate::storage::{
12 BackendError, NotFoundSnafu, OtherIoSnafu, StorageError, StorageLocation, StorageResult,
13};
14
15pub(super) struct TempFileGuard {
18 path: PathBuf,
19 armed: bool,
20}
21
22impl TempFileGuard {
23 pub(super) fn new(path: PathBuf) -> Self {
24 Self { path, armed: true }
25 }
26
27 pub(super) fn disarm(&mut self) {
30 self.armed = false;
31 }
32}
33
34impl Drop for TempFileGuard {
35 fn drop(&mut self) {
36 if self.armed {
37 let _ = std::fs::remove_file(&self.path);
39 }
40 }
41}
42
43pub(super) fn join_local(location: &StorageLocation, rel: &Path) -> PathBuf {
47 match location {
48 StorageLocation::Local(root) => root.join(rel),
49 }
50}
51
52pub(super) async fn create_parent_dir(abs: &Path) -> StorageResult<()> {
53 if let Some(parent) = abs.parent() {
54 fs::create_dir_all(parent)
55 .await
56 .map_err(BackendError::Local)
57 .context(OtherIoSnafu {
58 path: parent.display().to_string(),
59 })?;
60 }
61 Ok(())
62}
63
64pub async fn write_atomic(
82 location: &StorageLocation,
83 rel_path: &Path,
84 contents: &[u8],
85) -> StorageResult<()> {
86 match location {
87 StorageLocation::Local(_) => {
88 let abs = join_local(location, rel_path);
89
90 create_parent_dir(&abs).await?;
91
92 let tmp_path = abs.with_extension("tmp");
93 let mut guard = TempFileGuard::new(tmp_path.clone());
94
95 {
96 let mut file = fs::File::create(&tmp_path)
97 .await
98 .map_err(BackendError::Local)
99 .context(OtherIoSnafu {
100 path: tmp_path.display().to_string(),
101 })?;
102
103 file.write_all(contents)
104 .await
105 .map_err(BackendError::Local)
106 .context(OtherIoSnafu {
107 path: tmp_path.display().to_string(),
108 })?;
109
110 file.sync_all()
111 .await
112 .map_err(BackendError::Local)
113 .context(OtherIoSnafu {
114 path: tmp_path.display().to_string(),
115 })?;
116 }
117
118 fs::rename(&tmp_path, &abs)
119 .await
120 .map_err(BackendError::Local)
121 .context(OtherIoSnafu {
122 path: abs.display().to_string(),
123 })?;
124
125 guard.disarm();
127
128 Ok(())
129 }
130 }
131}
132
133pub async fn read_to_string(location: &StorageLocation, rel_path: &Path) -> StorageResult<String> {
140 match location {
141 StorageLocation::Local(_) => {
142 let abs = join_local(location, rel_path);
143
144 match fs::read_to_string(&abs).await {
145 Ok(s) => Ok(s),
146 Err(e) if e.kind() == io::ErrorKind::NotFound => Err(BackendError::Local(e))
147 .context(NotFoundSnafu {
148 path: abs.display().to_string(),
149 }),
150 Err(e) => Err(BackendError::Local(e)).context(OtherIoSnafu {
151 path: abs.display().to_string(),
152 }),
153 }
154 }
155 }
156}
157
158pub async fn write_new(
163 location: &StorageLocation,
164 rel_path: &Path,
165 contents: &[u8],
166) -> StorageResult<()> {
167 match location {
168 StorageLocation::Local(_) => {
169 let abs = join_local(location, rel_path);
170 create_parent_dir(&abs).await?;
171
172 let path_str = abs.display().to_string();
173
174 let open_result = OpenOptions::new()
176 .write(true)
177 .create_new(true)
178 .open(&abs)
179 .await;
180
181 let mut file = match open_result {
182 Ok(f) => f,
183 Err(e) => {
184 let backend = BackendError::Local(e);
185 let storage_err = match &backend {
187 BackendError::Local(inner)
188 if inner.kind() == io::ErrorKind::AlreadyExists =>
189 {
190 StorageError::AlreadyExists {
191 path: path_str,
192 source: backend,
193 backtrace: Backtrace::capture(),
194 }
195 }
196 _ => StorageError::OtherIo {
197 path: path_str,
198 source: backend,
199 backtrace: Backtrace::capture(),
200 },
201 };
202 return Err(storage_err);
203 }
204 };
205
206 file.write_all(contents)
207 .await
208 .map_err(BackendError::Local)
209 .context(OtherIoSnafu {
210 path: abs.display().to_string(),
211 })?;
212
213 file.sync_all()
214 .await
215 .map_err(BackendError::Local)
216 .context(OtherIoSnafu {
217 path: abs.display().to_string(),
218 })?;
219
220 Ok(())
221 }
222 }
223}
224
225pub struct FileHeadTail4 {
228 pub len: u64,
230 pub head: [u8; 4],
232 pub tail: [u8; 4],
234}
235
236pub async fn read_head_tail_4(
250 location: &StorageLocation,
251 rel_path: &Path,
252) -> StorageResult<FileHeadTail4> {
253 match location {
254 StorageLocation::Local(_) => {
255 let abs = join_local(location, rel_path);
256 let path_str = abs.display().to_string();
257
258 let meta = match fs::metadata(&abs).await {
260 Ok(m) => m,
261 Err(e) if e.kind() == io::ErrorKind::NotFound => {
262 return Err(BackendError::Local(e)).context(NotFoundSnafu { path: path_str });
263 }
264 Err(e) => {
265 return Err(BackendError::Local(e)).context(OtherIoSnafu { path: path_str });
266 }
267 };
268
269 if !meta.is_file() {
271 let synthetic = io::Error::other("not a regular file");
272 let backend = BackendError::Local(synthetic);
273 return Err(StorageError::NotFound {
274 path: path_str,
275 source: backend,
276 backtrace: Backtrace::capture(),
277 });
278 }
279
280 let len = meta.len();
281
282 let mut file = fs::File::open(&abs)
283 .await
284 .map_err(BackendError::Local)
285 .context(OtherIoSnafu {
286 path: path_str.clone(),
287 })?;
288
289 let mut head = [0u8; 4];
290 let mut tail = [0u8; 4];
291
292 if len >= 4 {
294 file.read_exact(&mut head)
295 .await
296 .map_err(BackendError::Local)
297 .context(OtherIoSnafu {
298 path: path_str.clone(),
299 })?;
300 }
301
302 if len >= 8 {
304 file.seek(SeekFrom::End(-4))
305 .await
306 .map_err(BackendError::Local)
307 .context(OtherIoSnafu {
308 path: path_str.clone(),
309 })?;
310 file.read_exact(&mut tail)
311 .await
312 .map_err(BackendError::Local)
313 .context(OtherIoSnafu {
314 path: path_str.clone(),
315 })?;
316 }
317 Ok(FileHeadTail4 { len, head, tail })
318 }
319 }
320}
321
322pub async fn read_all_bytes(location: &StorageLocation, rel_path: &Path) -> StorageResult<Vec<u8>> {
331 match location {
332 StorageLocation::Local(_) => {
333 let abs = join_local(location, rel_path);
334 let path_str = abs.display().to_string();
335
336 match fs::read(&abs).await {
337 Ok(bytes) => Ok(bytes),
338 Err(e) if e.kind() == io::ErrorKind::NotFound => {
339 Err(BackendError::Local(e)).context(NotFoundSnafu { path: path_str })
340 }
341 Err(e) => Err(BackendError::Local(e)).context(OtherIoSnafu { path: path_str }),
342 }
343 }
344 }
345}
346
347pub async fn file_size(location: &StorageLocation, rel_path: &Path) -> StorageResult<u64> {
351 match location {
352 StorageLocation::Local(_) => {
353 let abs = join_local(location, rel_path);
354 let path_str = abs.display().to_string();
355
356 let meta = fs::metadata(&abs).await;
357 match meta {
358 Ok(m) => Ok(m.len()),
359 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
360 Err(BackendError::Local(e)).context(NotFoundSnafu { path: path_str })
361 }
362 Err(e) => Err(BackendError::Local(e)).context(OtherIoSnafu { path: path_str }),
363 }
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370
371 use super::*;
372 use tempfile::TempDir;
373
374 type TestResult = Result<(), Box<dyn std::error::Error>>;
375
376 #[tokio::test]
377 async fn write_atomic_creates_file_with_contents() -> TestResult {
378 let tmp = TempDir::new()?;
379 let location = StorageLocation::local(tmp.path());
380
381 let rel_path = Path::new("test.txt");
382 let contents = b"hello world";
383
384 write_atomic(&location, rel_path, contents).await?;
385
386 let abs = tmp.path().join(rel_path);
388 let read_back = tokio::fs::read_to_string(&abs).await?;
389 assert_eq!(read_back, "hello world");
390 Ok(())
391 }
392
393 #[tokio::test]
394 async fn write_atomic_creates_parent_directories() -> TestResult {
395 let tmp = TempDir::new()?;
396 let location = StorageLocation::local(tmp.path());
397
398 let rel_path = Path::new("nested/deep/dir/file.txt");
399 let contents = b"nested content";
400
401 write_atomic(&location, rel_path, contents).await?;
402
403 let abs = tmp.path().join(rel_path);
404 assert!(abs.exists());
405 let read_back = tokio::fs::read_to_string(&abs).await?;
406 assert_eq!(read_back, "nested content");
407 Ok(())
408 }
409
410 #[tokio::test]
411 async fn write_atomic_overwrites_existing_file() -> TestResult {
412 let tmp = TempDir::new()?;
413 let location = StorageLocation::local(tmp.path());
414 let rel_path = Path::new("overwrite.txt");
415
416 write_atomic(&location, rel_path, b"original").await?;
418
419 write_atomic(&location, rel_path, b"updated").await?;
421
422 let abs = tmp.path().join(rel_path);
423 let read_back = tokio::fs::read_to_string(&abs).await?;
424 assert_eq!(read_back, "updated");
425 Ok(())
426 }
427
428 #[tokio::test]
429 async fn write_atomic_no_leftover_tmp_file() -> TestResult {
430 let tmp = TempDir::new()?;
431 let location = StorageLocation::local(tmp.path());
432 let rel_path = Path::new("clean.txt");
433
434 write_atomic(&location, rel_path, b"data").await?;
435
436 let tmp_path = tmp.path().join("clean.tmp");
438 assert!(!tmp_path.exists());
439 Ok(())
440 }
441
442 #[tokio::test]
443 async fn read_to_string_returns_file_contents() -> TestResult {
444 let tmp = TempDir::new()?;
445 let location = StorageLocation::local(tmp.path());
446 let rel_path = Path::new("readable.txt");
447
448 let abs = tmp.path().join(rel_path);
450 tokio::fs::write(&abs, "file contents").await?;
451
452 let result = read_to_string(&location, rel_path).await?;
453 assert_eq!(result, "file contents");
454 Ok(())
455 }
456
457 #[tokio::test]
458 async fn read_to_string_returns_not_found_for_missing_file() -> TestResult {
459 let tmp = TempDir::new()?;
460 let location = StorageLocation::local(tmp.path());
461 let rel_path = Path::new("does_not_exist.txt");
462
463 let result = read_to_string(&location, rel_path).await;
464
465 assert!(result.is_err());
466 let err = result.expect_err("expected NotFound error");
467 assert!(matches!(err, StorageError::NotFound { .. }));
468 Ok(())
469 }
470
471 #[tokio::test]
472 async fn write_then_read_roundtrip() -> TestResult {
473 let tmp = TempDir::new()?;
474 let location = StorageLocation::local(tmp.path());
475 let rel_path = Path::new("roundtrip.txt");
476
477 let original = "roundtrip content 🎉";
478 write_atomic(&location, rel_path, original.as_bytes()).await?;
479
480 let read_back = read_to_string(&location, rel_path).await?;
481 assert_eq!(read_back, original);
482 Ok(())
483 }
484
485 #[tokio::test]
486 async fn write_new_creates_file_with_contents() -> TestResult {
487 let tmp = TempDir::new()?;
488 let location = StorageLocation::local(tmp.path());
489 let rel_path = Path::new("new_file.txt");
490
491 write_new(&location, rel_path, b"new content").await?;
492
493 let abs = tmp.path().join(rel_path);
494 let read_back = tokio::fs::read_to_string(&abs).await?;
495 assert_eq!(read_back, "new content");
496 Ok(())
497 }
498
499 #[tokio::test]
500 async fn write_new_fails_if_file_exists() -> TestResult {
501 let tmp = TempDir::new()?;
502 let location = StorageLocation::local(tmp.path());
503 let rel_path = Path::new("existing.txt");
504
505 write_new(&location, rel_path, b"first").await?;
507
508 let result = write_new(&location, rel_path, b"second").await;
510
511 assert!(result.is_err());
512 let err = result.expect_err("expected AlreadyExists error");
513 assert!(matches!(err, StorageError::AlreadyExists { .. }));
514
515 let read_back = read_to_string(&location, rel_path).await?;
517 assert_eq!(read_back, "first");
518 Ok(())
519 }
520
521 #[tokio::test]
522 async fn write_new_creates_parent_directories() -> TestResult {
523 let tmp = TempDir::new()?;
524 let location = StorageLocation::local(tmp.path());
525 let rel_path = Path::new("nested/path/new_file.txt");
526
527 write_new(&location, rel_path, b"nested new").await?;
528
529 let abs = tmp.path().join(rel_path);
530 assert!(abs.exists());
531 let read_back = tokio::fs::read_to_string(&abs).await?;
532 assert_eq!(read_back, "nested new");
533 Ok(())
534 }
535}