Skip to main content

ailake_store/
local.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::ops::Range;
3use std::path::{Path, PathBuf};
4
5use ailake_core::{AilakeError, AilakeResult};
6use async_trait::async_trait;
7use bytes::Bytes;
8use tokio::io::{AsyncReadExt, AsyncSeekExt};
9
10use crate::store::Store;
11
12pub struct LocalStore {
13    root: PathBuf,
14}
15
16impl LocalStore {
17    pub fn new(root: impl AsRef<Path>) -> Self {
18        Self {
19            root: root.as_ref().to_path_buf(),
20        }
21    }
22
23    fn full_path(&self, path: &str) -> PathBuf {
24        // Strip file:// scheme so callers can pass absolute file:// URIs.
25        // PathBuf::join with an absolute path ignores self.root, so
26        // "file:///abs/path" → "/abs/path" resolves correctly.
27        let clean = path.strip_prefix("file://").unwrap_or(path);
28        self.root.join(clean)
29    }
30}
31
32#[async_trait]
33impl Store for LocalStore {
34    async fn get(&self, path: &str) -> AilakeResult<Bytes> {
35        let data = tokio::fs::read(self.full_path(path)).await?;
36        Ok(Bytes::from(data))
37    }
38
39    async fn get_range(&self, path: &str, range: Range<u64>) -> AilakeResult<Bytes> {
40        let mut file = tokio::fs::File::open(self.full_path(path)).await?;
41        file.seek(std::io::SeekFrom::Start(range.start)).await?;
42        let len = (range.end - range.start) as usize;
43        let mut buf = vec![0u8; len];
44        file.read_exact(&mut buf).await?;
45        Ok(Bytes::from(buf))
46    }
47
48    async fn put(&self, path: &str, data: Bytes) -> AilakeResult<()> {
49        let full = self.full_path(path);
50        if let Some(parent) = full.parent() {
51            tokio::fs::create_dir_all(parent).await?;
52        }
53        tokio::fs::write(full, data).await?;
54        Ok(())
55    }
56
57    async fn list(&self, prefix: &str) -> AilakeResult<Vec<String>> {
58        let dir = self.full_path(prefix);
59        if !dir.exists() {
60            return Ok(vec![]);
61        }
62        let mut entries = Vec::new();
63        let mut read_dir = tokio::fs::read_dir(&dir).await?;
64        while let Some(entry) = read_dir.next_entry().await? {
65            let path = entry.path();
66            if path.is_file() {
67                let rel = path
68                    .strip_prefix(&self.root)
69                    .map_err(|e| AilakeError::Store(e.to_string()))?
70                    .to_string_lossy()
71                    .to_string();
72                entries.push(rel);
73            }
74        }
75        entries.sort();
76        Ok(entries)
77    }
78
79    async fn file_size(&self, path: &str) -> AilakeResult<u64> {
80        let meta = tokio::fs::metadata(self.full_path(path)).await?;
81        Ok(meta.len())
82    }
83
84    async fn exists(&self, path: &str) -> AilakeResult<bool> {
85        Ok(self.full_path(path).exists())
86    }
87
88    async fn delete(&self, path: &str) -> AilakeResult<()> {
89        tokio::fs::remove_file(self.full_path(path)).await?;
90        Ok(())
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use tempfile::TempDir;
98
99    #[tokio::test]
100    async fn put_get_roundtrip() {
101        let dir = TempDir::new().unwrap();
102        let store = LocalStore::new(dir.path());
103        let data = Bytes::from("hello ailake");
104        store.put("test.bin", data.clone()).await.unwrap();
105        let got = store.get("test.bin").await.unwrap();
106        assert_eq!(got, data);
107    }
108
109    #[tokio::test]
110    async fn get_range_reads_partial() {
111        let dir = TempDir::new().unwrap();
112        let store = LocalStore::new(dir.path());
113        let data = Bytes::from(b"abcdefghijklmnop".as_ref());
114        store.put("test.bin", data).await.unwrap();
115        let partial = store.get_range("test.bin", 4..8).await.unwrap();
116        assert_eq!(partial.as_ref(), b"efgh");
117    }
118
119    #[tokio::test]
120    async fn list_returns_files() {
121        let dir = TempDir::new().unwrap();
122        let store = LocalStore::new(dir.path());
123        store.put("data/a.parquet", Bytes::from("a")).await.unwrap();
124        store.put("data/b.parquet", Bytes::from("b")).await.unwrap();
125        let files = store.list("data").await.unwrap();
126        assert_eq!(files.len(), 2);
127    }
128
129    #[tokio::test]
130    async fn file_size_correct() {
131        let dir = TempDir::new().unwrap();
132        let store = LocalStore::new(dir.path());
133        store
134            .put("x.bin", Bytes::from(vec![0u8; 42]))
135            .await
136            .unwrap();
137        assert_eq!(store.file_size("x.bin").await.unwrap(), 42);
138    }
139}