Skip to main content

ailake_store/
local.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::ops::Range;
3use std::path::{Path, PathBuf};
4
5use ailake_core::{AilakeError, AilakeResult};
6use async_trait::async_trait;
7use bytes::Bytes;
8use tokio::io::{AsyncReadExt, AsyncSeekExt};
9
10use crate::store::Store;
11
12pub struct LocalStore {
13    root: PathBuf,
14}
15
16impl LocalStore {
17    pub fn new(root: impl AsRef<Path>) -> Self {
18        let path = root.as_ref();
19        // Strip file:// scheme if the caller passes a file:// URI.
20        // Without this, PathBuf::from("file:///abs/path") is a RELATIVE path
21        // starting with the literal segment "file:" — not an absolute path.
22        let clean = path
23            .to_str()
24            .and_then(|s| s.strip_prefix("file://"))
25            .map(PathBuf::from)
26            .unwrap_or_else(|| path.to_path_buf());
27        Self { root: clean }
28    }
29
30    fn full_path(&self, path: &str) -> PathBuf {
31        // Strip file:// scheme so callers can pass absolute file:// URIs.
32        // PathBuf::join with an absolute path ignores self.root, so
33        // "file:///abs/path" → "/abs/path" resolves correctly.
34        let clean = path.strip_prefix("file://").unwrap_or(path);
35        self.root.join(clean)
36    }
37}
38
39#[async_trait]
40impl Store for LocalStore {
41    async fn get(&self, path: &str) -> AilakeResult<Bytes> {
42        let data = tokio::fs::read(self.full_path(path)).await?;
43        Ok(Bytes::from(data))
44    }
45
46    async fn get_range(&self, path: &str, range: Range<u64>) -> AilakeResult<Bytes> {
47        let mut file = tokio::fs::File::open(self.full_path(path)).await?;
48        file.seek(std::io::SeekFrom::Start(range.start)).await?;
49        let len = (range.end - range.start) as usize;
50        let mut buf = vec![0u8; len];
51        file.read_exact(&mut buf).await?;
52        Ok(Bytes::from(buf))
53    }
54
55    async fn put(&self, path: &str, data: Bytes) -> AilakeResult<()> {
56        let full = self.full_path(path);
57        if let Some(parent) = full.parent() {
58            tokio::fs::create_dir_all(parent).await?;
59        }
60        tokio::fs::write(full, data).await?;
61        Ok(())
62    }
63
64    async fn list(&self, prefix: &str) -> AilakeResult<Vec<String>> {
65        let dir = self.full_path(prefix);
66        if !dir.exists() {
67            return Ok(vec![]);
68        }
69        let mut entries = Vec::new();
70        let mut read_dir = tokio::fs::read_dir(&dir).await?;
71        while let Some(entry) = read_dir.next_entry().await? {
72            let path = entry.path();
73            if path.is_file() {
74                let rel = path
75                    .strip_prefix(&self.root)
76                    .map_err(|e| AilakeError::Store(e.to_string()))?
77                    .to_string_lossy()
78                    .to_string();
79                entries.push(rel);
80            }
81        }
82        entries.sort();
83        Ok(entries)
84    }
85
86    async fn file_size(&self, path: &str) -> AilakeResult<u64> {
87        let meta = tokio::fs::metadata(self.full_path(path)).await?;
88        Ok(meta.len())
89    }
90
91    async fn exists(&self, path: &str) -> AilakeResult<bool> {
92        Ok(self.full_path(path).exists())
93    }
94
95    async fn delete(&self, path: &str) -> AilakeResult<()> {
96        tokio::fs::remove_file(self.full_path(path)).await?;
97        Ok(())
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use tempfile::TempDir;
105
106    #[tokio::test]
107    async fn put_get_roundtrip() {
108        let dir = TempDir::new().unwrap();
109        let store = LocalStore::new(dir.path());
110        let data = Bytes::from("hello ailake");
111        store.put("test.bin", data.clone()).await.unwrap();
112        let got = store.get("test.bin").await.unwrap();
113        assert_eq!(got, data);
114    }
115
116    #[tokio::test]
117    async fn get_range_reads_partial() {
118        let dir = TempDir::new().unwrap();
119        let store = LocalStore::new(dir.path());
120        let data = Bytes::from(b"abcdefghijklmnop".as_ref());
121        store.put("test.bin", data).await.unwrap();
122        let partial = store.get_range("test.bin", 4..8).await.unwrap();
123        assert_eq!(partial.as_ref(), b"efgh");
124    }
125
126    #[tokio::test]
127    async fn list_returns_files() {
128        let dir = TempDir::new().unwrap();
129        let store = LocalStore::new(dir.path());
130        store.put("data/a.parquet", Bytes::from("a")).await.unwrap();
131        store.put("data/b.parquet", Bytes::from("b")).await.unwrap();
132        let files = store.list("data").await.unwrap();
133        assert_eq!(files.len(), 2);
134    }
135
136    #[tokio::test]
137    async fn file_size_correct() {
138        let dir = TempDir::new().unwrap();
139        let store = LocalStore::new(dir.path());
140        store
141            .put("x.bin", Bytes::from(vec![0u8; 42]))
142            .await
143            .unwrap();
144        assert_eq!(store.file_size("x.bin").await.unwrap(), 42);
145    }
146}