Skip to main content

timeseries_table_core/storage/
table_location.rs

1use std::path::{Path, PathBuf};
2
3use snafu::{IntoError, ResultExt};
4use tokio::fs;
5
6use crate::storage::layout;
7use crate::storage::{
8    AlreadyExistsNoSourceSnafu, BackendError, NotFoundSnafu, OtherIoSnafu, StorageLocation,
9    StorageResult,
10};
11
12/// Table root location with table-scoped semantics.
13///
14/// This wraps `StorageLocation` and is used when callers need to treat the
15/// location as a table root (e.g. log layout, segment paths, and helpers like
16/// `ensure_parquet_under_root`).
17#[derive(Debug, Clone)]
18pub struct TableLocation(StorageLocation);
19
20impl From<TableLocation> for StorageLocation {
21    fn from(t: TableLocation) -> Self {
22        t.0
23    }
24}
25
26impl AsRef<StorageLocation> for TableLocation {
27    fn as_ref(&self) -> &StorageLocation {
28        &self.0
29    }
30}
31
32impl TableLocation {
33    /// Creates a new `TableLocation` for a local filesystem path.
34    pub fn local(root: impl Into<PathBuf>) -> Self {
35        TableLocation(StorageLocation::Local(root.into()))
36    }
37
38    /// Parse a user-facing table location string into a TableLocation.
39    /// v0.1: only local filesystem paths are supported.
40    pub fn parse(spec: &str) -> StorageResult<Self> {
41        StorageLocation::parse(spec).map(TableLocation)
42    }
43
44    /// Return the underlying StorageLocation
45    pub fn storage(&self) -> &StorageLocation {
46        &self.0
47    }
48
49    /// Ensure `parquet_path` is under this table root.
50    /// If not, copy it into `data/<filename>` and return the relative path.
51    pub async fn ensure_parquet_under_root(&self, parquet_path: &Path) -> StorageResult<PathBuf> {
52        match self.as_ref() {
53            StorageLocation::Local(table_root) => {
54                let root = fs::canonicalize(table_root)
55                    .await
56                    .map_err(BackendError::Local)
57                    .context(NotFoundSnafu {
58                        path: table_root.display().to_string(),
59                    })?;
60
61                let src = fs::canonicalize(parquet_path)
62                    .await
63                    .map_err(BackendError::Local)
64                    .context(NotFoundSnafu {
65                        path: parquet_path.display().to_string(),
66                    })?;
67
68                if let Ok(rel) = src.strip_prefix(&root) {
69                    return Ok(rel.to_path_buf());
70                }
71
72                let file_name = src
73                    .file_name()
74                    .ok_or_else(|| {
75                        OtherIoSnafu {
76                            path: src.display().to_string(),
77                        }
78                        .into_error(BackendError::Local(
79                            std::io::Error::other("parquet path has no filename"),
80                        ))
81                    })?
82                    .to_owned();
83
84                let data_dir = root.join(layout::DATA_DIR_NAME);
85                fs::create_dir_all(&data_dir)
86                    .await
87                    .map_err(BackendError::Local)
88                    .context(OtherIoSnafu {
89                        path: data_dir.display().to_string(),
90                    })?;
91
92                let dst = data_dir.join(file_name);
93
94                match fs::metadata(&dst).await {
95                    Ok(_) => {
96                        return AlreadyExistsNoSourceSnafu {
97                            path: dst.display().to_string(),
98                        }
99                        .fail();
100                    }
101                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
102                        // ok to proceed
103                    }
104
105                    Err(e) => {
106                        return Err(BackendError::Local(e)).context(OtherIoSnafu {
107                            path: dst.display().to_string(),
108                        });
109                    }
110                }
111
112                fs::copy(&src, &dst)
113                    .await
114                    .map_err(BackendError::Local)
115                    .context(OtherIoSnafu {
116                        path: dst.display().to_string(),
117                    })?;
118
119                let dst = fs::canonicalize(&dst)
120                    .await
121                    .map_err(BackendError::Local)
122                    .context(OtherIoSnafu {
123                        path: dst.display().to_string(),
124                    })?;
125
126                let rel = dst.strip_prefix(&root).map_err(|_| {
127                    OtherIoSnafu {
128                        path: dst.display().to_string(),
129                    }
130                    .into_error(BackendError::Local(std::io::Error::other(
131                        "copied parquet is not under table root",
132                    )))
133                })?;
134
135                Ok(rel.to_path_buf())
136            }
137        }
138    }
139}
140
141#[cfg(test)]
142mod tests {
143
144    use crate::storage::StorageError;
145
146    use super::*;
147    use tempfile::TempDir;
148
149    type TestResult = Result<(), Box<dyn std::error::Error>>;
150
151    #[tokio::test]
152    async fn ensure_parquet_under_root_returns_relative_path() -> TestResult {
153        let tmp = TempDir::new()?;
154        let location = TableLocation::local(tmp.path());
155
156        let rel_path = Path::new("data/seg.parquet");
157        let abs_path = tmp.path().join(rel_path);
158        tokio::fs::create_dir_all(abs_path.parent().unwrap()).await?;
159        tokio::fs::write(&abs_path, b"parquet").await?;
160
161        let rel = location.ensure_parquet_under_root(&abs_path).await?;
162        assert_eq!(rel, rel_path);
163        Ok(())
164    }
165
166    #[tokio::test]
167    async fn ensure_parquet_under_root_copies_outside_file() -> TestResult {
168        let tmp = TempDir::new()?;
169        let table_root = tmp.path().join("table");
170        tokio::fs::create_dir_all(&table_root).await?;
171        let location = TableLocation::local(&table_root);
172
173        let src_path = tmp.path().join("outside.parquet");
174        tokio::fs::write(&src_path, b"parquet").await?;
175
176        let rel = location.ensure_parquet_under_root(&src_path).await?;
177        let expected_rel = PathBuf::from("data/outside.parquet");
178        assert_eq!(rel, expected_rel);
179
180        let dst = table_root.join(&expected_rel);
181        assert!(dst.exists());
182        let contents = tokio::fs::read(&dst).await?;
183        assert_eq!(contents, b"parquet");
184        Ok(())
185    }
186
187    #[tokio::test]
188    async fn ensure_parquet_under_root_refuses_overwrite() -> TestResult {
189        let tmp = TempDir::new()?;
190        let table_root = tmp.path().join("table");
191        tokio::fs::create_dir_all(&table_root).await?;
192        let location = TableLocation::local(&table_root);
193
194        let data_dir = table_root.join("data");
195        tokio::fs::create_dir_all(&data_dir).await?;
196        let existing_dst = data_dir.join("seg.parquet");
197        tokio::fs::write(&existing_dst, b"existing").await?;
198
199        let src_path = tmp.path().join("seg.parquet");
200        tokio::fs::write(&src_path, b"new").await?;
201
202        let err = location
203            .ensure_parquet_under_root(&src_path)
204            .await
205            .expect_err("expected AlreadyExistsNoSource");
206
207        assert!(matches!(err, StorageError::AlreadyExistsNoSource { .. }));
208        Ok(())
209    }
210}