1use std::ops::Range;
3use std::path::{Path, PathBuf};
4
5use ailake_core::{AilakeError, AilakeResult};
6use async_trait::async_trait;
7use bytes::Bytes;
8use tokio::io::{AsyncReadExt, AsyncSeekExt};
9
10use crate::store::Store;
11
12pub struct LocalStore {
13 root: PathBuf,
14}
15
16impl LocalStore {
17 pub fn new(root: impl AsRef<Path>) -> Self {
18 Self {
19 root: root.as_ref().to_path_buf(),
20 }
21 }
22
23 fn full_path(&self, path: &str) -> PathBuf {
24 let clean = path.strip_prefix("file://").unwrap_or(path);
28 self.root.join(clean)
29 }
30}
31
32#[async_trait]
33impl Store for LocalStore {
34 async fn get(&self, path: &str) -> AilakeResult<Bytes> {
35 let data = tokio::fs::read(self.full_path(path)).await?;
36 Ok(Bytes::from(data))
37 }
38
39 async fn get_range(&self, path: &str, range: Range<u64>) -> AilakeResult<Bytes> {
40 let mut file = tokio::fs::File::open(self.full_path(path)).await?;
41 file.seek(std::io::SeekFrom::Start(range.start)).await?;
42 let len = (range.end - range.start) as usize;
43 let mut buf = vec![0u8; len];
44 file.read_exact(&mut buf).await?;
45 Ok(Bytes::from(buf))
46 }
47
48 async fn put(&self, path: &str, data: Bytes) -> AilakeResult<()> {
49 let full = self.full_path(path);
50 if let Some(parent) = full.parent() {
51 tokio::fs::create_dir_all(parent).await?;
52 }
53 tokio::fs::write(full, data).await?;
54 Ok(())
55 }
56
57 async fn list(&self, prefix: &str) -> AilakeResult<Vec<String>> {
58 let dir = self.full_path(prefix);
59 if !dir.exists() {
60 return Ok(vec![]);
61 }
62 let mut entries = Vec::new();
63 let mut read_dir = tokio::fs::read_dir(&dir).await?;
64 while let Some(entry) = read_dir.next_entry().await? {
65 let path = entry.path();
66 if path.is_file() {
67 let rel = path
68 .strip_prefix(&self.root)
69 .map_err(|e| AilakeError::Store(e.to_string()))?
70 .to_string_lossy()
71 .to_string();
72 entries.push(rel);
73 }
74 }
75 entries.sort();
76 Ok(entries)
77 }
78
79 async fn file_size(&self, path: &str) -> AilakeResult<u64> {
80 let meta = tokio::fs::metadata(self.full_path(path)).await?;
81 Ok(meta.len())
82 }
83
84 async fn exists(&self, path: &str) -> AilakeResult<bool> {
85 Ok(self.full_path(path).exists())
86 }
87
88 async fn delete(&self, path: &str) -> AilakeResult<()> {
89 tokio::fs::remove_file(self.full_path(path)).await?;
90 Ok(())
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use tempfile::TempDir;
98
99 #[tokio::test]
100 async fn put_get_roundtrip() {
101 let dir = TempDir::new().unwrap();
102 let store = LocalStore::new(dir.path());
103 let data = Bytes::from("hello ailake");
104 store.put("test.bin", data.clone()).await.unwrap();
105 let got = store.get("test.bin").await.unwrap();
106 assert_eq!(got, data);
107 }
108
109 #[tokio::test]
110 async fn get_range_reads_partial() {
111 let dir = TempDir::new().unwrap();
112 let store = LocalStore::new(dir.path());
113 let data = Bytes::from(b"abcdefghijklmnop".as_ref());
114 store.put("test.bin", data).await.unwrap();
115 let partial = store.get_range("test.bin", 4..8).await.unwrap();
116 assert_eq!(partial.as_ref(), b"efgh");
117 }
118
119 #[tokio::test]
120 async fn list_returns_files() {
121 let dir = TempDir::new().unwrap();
122 let store = LocalStore::new(dir.path());
123 store.put("data/a.parquet", Bytes::from("a")).await.unwrap();
124 store.put("data/b.parquet", Bytes::from("b")).await.unwrap();
125 let files = store.list("data").await.unwrap();
126 assert_eq!(files.len(), 2);
127 }
128
129 #[tokio::test]
130 async fn file_size_correct() {
131 let dir = TempDir::new().unwrap();
132 let store = LocalStore::new(dir.path());
133 store
134 .put("x.bin", Bytes::from(vec![0u8; 42]))
135 .await
136 .unwrap();
137 assert_eq!(store.file_size("x.bin").await.unwrap(), 42);
138 }
139}