1use std::ops::Range;
3use std::path::{Path, PathBuf};
4
5use ailake_core::{AilakeError, AilakeResult};
6use async_trait::async_trait;
7use bytes::Bytes;
8use tokio::io::{AsyncReadExt, AsyncSeekExt};
9
10use crate::store::Store;
11
12pub struct LocalStore {
13 root: PathBuf,
14}
15
16impl LocalStore {
17 pub fn new(root: impl AsRef<Path>) -> Self {
18 let path = root.as_ref();
19 let clean = path
23 .to_str()
24 .and_then(|s| s.strip_prefix("file://"))
25 .map(PathBuf::from)
26 .unwrap_or_else(|| path.to_path_buf());
27 Self { root: clean }
28 }
29
30 fn full_path(&self, path: &str) -> PathBuf {
31 let clean = path.strip_prefix("file://").unwrap_or(path);
35 self.root.join(clean)
36 }
37}
38
39#[async_trait]
40impl Store for LocalStore {
41 async fn get(&self, path: &str) -> AilakeResult<Bytes> {
42 let data = tokio::fs::read(self.full_path(path)).await?;
43 Ok(Bytes::from(data))
44 }
45
46 async fn get_range(&self, path: &str, range: Range<u64>) -> AilakeResult<Bytes> {
47 let mut file = tokio::fs::File::open(self.full_path(path)).await?;
48 file.seek(std::io::SeekFrom::Start(range.start)).await?;
49 let len = (range.end - range.start) as usize;
50 let mut buf = vec![0u8; len];
51 file.read_exact(&mut buf).await?;
52 Ok(Bytes::from(buf))
53 }
54
55 async fn put(&self, path: &str, data: Bytes) -> AilakeResult<()> {
56 let full = self.full_path(path);
57 if let Some(parent) = full.parent() {
58 tokio::fs::create_dir_all(parent).await?;
59 }
60 tokio::fs::write(full, data).await?;
61 Ok(())
62 }
63
64 async fn list(&self, prefix: &str) -> AilakeResult<Vec<String>> {
65 let dir = self.full_path(prefix);
66 if !dir.exists() {
67 return Ok(vec![]);
68 }
69 let mut entries = Vec::new();
70 let mut read_dir = tokio::fs::read_dir(&dir).await?;
71 while let Some(entry) = read_dir.next_entry().await? {
72 let path = entry.path();
73 if path.is_file() {
74 let rel = path
75 .strip_prefix(&self.root)
76 .map_err(|e| AilakeError::Store(e.to_string()))?
77 .to_string_lossy()
78 .to_string();
79 entries.push(rel);
80 }
81 }
82 entries.sort();
83 Ok(entries)
84 }
85
86 async fn file_size(&self, path: &str) -> AilakeResult<u64> {
87 let meta = tokio::fs::metadata(self.full_path(path)).await?;
88 Ok(meta.len())
89 }
90
91 async fn exists(&self, path: &str) -> AilakeResult<bool> {
92 Ok(self.full_path(path).exists())
93 }
94
95 async fn delete(&self, path: &str) -> AilakeResult<()> {
96 tokio::fs::remove_file(self.full_path(path)).await?;
97 Ok(())
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use tempfile::TempDir;
105
106 #[tokio::test]
107 async fn put_get_roundtrip() {
108 let dir = TempDir::new().unwrap();
109 let store = LocalStore::new(dir.path());
110 let data = Bytes::from("hello ailake");
111 store.put("test.bin", data.clone()).await.unwrap();
112 let got = store.get("test.bin").await.unwrap();
113 assert_eq!(got, data);
114 }
115
116 #[tokio::test]
117 async fn get_range_reads_partial() {
118 let dir = TempDir::new().unwrap();
119 let store = LocalStore::new(dir.path());
120 let data = Bytes::from(b"abcdefghijklmnop".as_ref());
121 store.put("test.bin", data).await.unwrap();
122 let partial = store.get_range("test.bin", 4..8).await.unwrap();
123 assert_eq!(partial.as_ref(), b"efgh");
124 }
125
126 #[tokio::test]
127 async fn list_returns_files() {
128 let dir = TempDir::new().unwrap();
129 let store = LocalStore::new(dir.path());
130 store.put("data/a.parquet", Bytes::from("a")).await.unwrap();
131 store.put("data/b.parquet", Bytes::from("b")).await.unwrap();
132 let files = store.list("data").await.unwrap();
133 assert_eq!(files.len(), 2);
134 }
135
136 #[tokio::test]
137 async fn file_size_correct() {
138 let dir = TempDir::new().unwrap();
139 let store = LocalStore::new(dir.path());
140 store
141 .put("x.bin", Bytes::from(vec![0u8; 42]))
142 .await
143 .unwrap();
144 assert_eq!(store.file_size("x.bin").await.unwrap(), 42);
145 }
146}