timeseries_table_core/storage/
table_location.rs1use std::path::{Path, PathBuf};
2
3use snafu::{IntoError, ResultExt};
4use tokio::fs;
5
6use crate::storage::layout;
7use crate::storage::{
8 AlreadyExistsNoSourceSnafu, BackendError, NotFoundSnafu, OtherIoSnafu, StorageLocation,
9 StorageResult,
10};
11
12#[derive(Debug, Clone)]
18pub struct TableLocation(StorageLocation);
19
20impl From<TableLocation> for StorageLocation {
21 fn from(t: TableLocation) -> Self {
22 t.0
23 }
24}
25
26impl AsRef<StorageLocation> for TableLocation {
27 fn as_ref(&self) -> &StorageLocation {
28 &self.0
29 }
30}
31
32impl TableLocation {
33 pub fn local(root: impl Into<PathBuf>) -> Self {
35 TableLocation(StorageLocation::Local(root.into()))
36 }
37
38 pub fn parse(spec: &str) -> StorageResult<Self> {
41 StorageLocation::parse(spec).map(TableLocation)
42 }
43
44 pub fn storage(&self) -> &StorageLocation {
46 &self.0
47 }
48
49 pub async fn ensure_parquet_under_root(&self, parquet_path: &Path) -> StorageResult<PathBuf> {
52 match self.as_ref() {
53 StorageLocation::Local(table_root) => {
54 let root = fs::canonicalize(table_root)
55 .await
56 .map_err(BackendError::Local)
57 .context(NotFoundSnafu {
58 path: table_root.display().to_string(),
59 })?;
60
61 let src = fs::canonicalize(parquet_path)
62 .await
63 .map_err(BackendError::Local)
64 .context(NotFoundSnafu {
65 path: parquet_path.display().to_string(),
66 })?;
67
68 if let Ok(rel) = src.strip_prefix(&root) {
69 return Ok(rel.to_path_buf());
70 }
71
72 let file_name = src
73 .file_name()
74 .ok_or_else(|| {
75 OtherIoSnafu {
76 path: src.display().to_string(),
77 }
78 .into_error(BackendError::Local(
79 std::io::Error::other("parquet path has no filename"),
80 ))
81 })?
82 .to_owned();
83
84 let data_dir = root.join(layout::DATA_DIR_NAME);
85 fs::create_dir_all(&data_dir)
86 .await
87 .map_err(BackendError::Local)
88 .context(OtherIoSnafu {
89 path: data_dir.display().to_string(),
90 })?;
91
92 let dst = data_dir.join(file_name);
93
94 match fs::metadata(&dst).await {
95 Ok(_) => {
96 return AlreadyExistsNoSourceSnafu {
97 path: dst.display().to_string(),
98 }
99 .fail();
100 }
101 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
102 }
104
105 Err(e) => {
106 return Err(BackendError::Local(e)).context(OtherIoSnafu {
107 path: dst.display().to_string(),
108 });
109 }
110 }
111
112 fs::copy(&src, &dst)
113 .await
114 .map_err(BackendError::Local)
115 .context(OtherIoSnafu {
116 path: dst.display().to_string(),
117 })?;
118
119 let dst = fs::canonicalize(&dst)
120 .await
121 .map_err(BackendError::Local)
122 .context(OtherIoSnafu {
123 path: dst.display().to_string(),
124 })?;
125
126 let rel = dst.strip_prefix(&root).map_err(|_| {
127 OtherIoSnafu {
128 path: dst.display().to_string(),
129 }
130 .into_error(BackendError::Local(std::io::Error::other(
131 "copied parquet is not under table root",
132 )))
133 })?;
134
135 Ok(rel.to_path_buf())
136 }
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143
144 use crate::storage::StorageError;
145
146 use super::*;
147 use tempfile::TempDir;
148
149 type TestResult = Result<(), Box<dyn std::error::Error>>;
150
151 #[tokio::test]
152 async fn ensure_parquet_under_root_returns_relative_path() -> TestResult {
153 let tmp = TempDir::new()?;
154 let location = TableLocation::local(tmp.path());
155
156 let rel_path = Path::new("data/seg.parquet");
157 let abs_path = tmp.path().join(rel_path);
158 tokio::fs::create_dir_all(abs_path.parent().unwrap()).await?;
159 tokio::fs::write(&abs_path, b"parquet").await?;
160
161 let rel = location.ensure_parquet_under_root(&abs_path).await?;
162 assert_eq!(rel, rel_path);
163 Ok(())
164 }
165
166 #[tokio::test]
167 async fn ensure_parquet_under_root_copies_outside_file() -> TestResult {
168 let tmp = TempDir::new()?;
169 let table_root = tmp.path().join("table");
170 tokio::fs::create_dir_all(&table_root).await?;
171 let location = TableLocation::local(&table_root);
172
173 let src_path = tmp.path().join("outside.parquet");
174 tokio::fs::write(&src_path, b"parquet").await?;
175
176 let rel = location.ensure_parquet_under_root(&src_path).await?;
177 let expected_rel = PathBuf::from("data/outside.parquet");
178 assert_eq!(rel, expected_rel);
179
180 let dst = table_root.join(&expected_rel);
181 assert!(dst.exists());
182 let contents = tokio::fs::read(&dst).await?;
183 assert_eq!(contents, b"parquet");
184 Ok(())
185 }
186
187 #[tokio::test]
188 async fn ensure_parquet_under_root_refuses_overwrite() -> TestResult {
189 let tmp = TempDir::new()?;
190 let table_root = tmp.path().join("table");
191 tokio::fs::create_dir_all(&table_root).await?;
192 let location = TableLocation::local(&table_root);
193
194 let data_dir = table_root.join("data");
195 tokio::fs::create_dir_all(&data_dir).await?;
196 let existing_dst = data_dir.join("seg.parquet");
197 tokio::fs::write(&existing_dst, b"existing").await?;
198
199 let src_path = tmp.path().join("seg.parquet");
200 tokio::fs::write(&src_path, b"new").await?;
201
202 let err = location
203 .ensure_parquet_under_root(&src_path)
204 .await
205 .expect_err("expected AlreadyExistsNoSource");
206
207 assert!(matches!(err, StorageError::AlreadyExistsNoSource { .. }));
208 Ok(())
209 }
210}