1use std::fs;
2use std::path::{Component, Path, PathBuf};
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use tokio::io::{AsyncWriteExt as _, copy};
7
8use crate::fs_utils::{
9 cleanup_temp_file_on_error, create_private_file, finalize_local_temp_without_overwrite,
10 write_private_file,
11};
12use crate::{Result, Storage, StorageError, StorageKey, StorageWriteIntent};
13
14#[derive(Debug, Clone)]
16pub struct LocalStorage {
17 root: PathBuf,
18}
19
20#[derive(Debug, Clone)]
22pub struct LocalStorageOptions {
23 pub root: PathBuf,
24}
25
26impl LocalStorage {
27 pub fn new(root: impl AsRef<Path>) -> Self {
29 Self {
30 root: root.as_ref().to_path_buf(),
31 }
32 }
33
34 pub fn from_options(options: LocalStorageOptions) -> Self {
36 Self::new(options.root)
37 }
38
39 fn resolve(&self, key: &StorageKey) -> (PathBuf, PathBuf) {
40 let key_path = key.as_path().to_path_buf();
41 (self.root.join(&key_path), key_path)
42 }
43
44 fn reject_symlink_prefixes(&self, key: &Path) -> Result<()> {
45 let Some(parent) = key.parent() else {
46 return Ok(());
47 };
48 let mut current = self.root.clone();
49 for component in parent.components() {
50 let Component::Normal(part) = component else {
51 return Err(invalid_storage_key());
52 };
53 current.push(part);
54 if let Ok(metadata) = fs::symlink_metadata(¤t)
55 && metadata.file_type().is_symlink()
56 {
57 return Err(StorageError::SymlinkRejected(format!(
58 "storage key resolves through a symlink: {}",
59 current.display()
60 )));
61 }
62 }
63 Ok(())
64 }
65
66 fn reject_symlink_object(&self, full: &Path) -> Result<()> {
67 if let Ok(metadata) = fs::symlink_metadata(full)
68 && metadata.file_type().is_symlink()
69 {
70 return Err(StorageError::SymlinkRejected(format!(
71 "storage key resolves to a symlink: {}",
72 full.display()
73 )));
74 }
75 Ok(())
76 }
77}
78
79#[async_trait]
80impl Storage for LocalStorage {
81 async fn put(
82 &self,
83 key: &StorageKey,
84 content: &[u8],
85 intent: StorageWriteIntent,
86 ) -> Result<StorageKey> {
87 let len = u64::try_from(content.len()).map_err(|_| StorageError::ObjectTooLarge {
88 label: intent.label(),
89 actual: u64::MAX,
90 limit: intent.max_bytes(),
91 })?;
92 intent.ensure_len(len)?;
93 let (full, key_path) = self.resolve(key);
94 put_local_bytes(self, key, &full, &key_path, content).await?;
95 Ok(key.clone())
96 }
97
98 async fn put_file(
99 &self,
100 key: &StorageKey,
101 source: &Path,
102 intent: StorageWriteIntent,
103 ) -> Result<StorageKey> {
104 let len = tokio::fs::metadata(source).await?.len();
105 intent.ensure_len(len)?;
106 let (full, key_path) = self.resolve(key);
107 self.reject_symlink_prefixes(&key_path)?;
108 let parent = full
109 .parent()
110 .ok_or_else(|| StorageError::Internal("storage key has no parent".to_string()))?;
111 tokio::fs::create_dir_all(parent).await?;
112 self.reject_symlink_prefixes(&key_path)?;
113 self.reject_symlink_object(&full)?;
114 if tokio::fs::try_exists(&full).await? {
115 return Err(StorageError::ObjectConflict(key.to_string()));
116 }
117 let temporary_full = parent.join(format!(".agentics-write-{}", uuid::Uuid::new_v4()));
118 let copy_result = async {
119 let copied = tokio::fs::copy(source, &temporary_full).await?;
120 if copied != len {
121 return Err(StorageError::Internal(format!(
122 "local source file length changed while storing {key}: expected {len}, copied {copied}"
123 )));
124 }
125 self.reject_symlink_prefixes(&key_path)?;
126 self.reject_symlink_object(&full)?;
127 finalize_local_temp_without_overwrite(&temporary_full, &full, key.as_str()).await?;
128 Ok::<(), StorageError>(())
129 }
130 .await;
131 cleanup_temp_file_on_error(copy_result, &temporary_full).await?;
132 Ok(key.clone())
133 }
134
135 async fn promote(
136 &self,
137 temporary_key: &StorageKey,
138 durable_key: &StorageKey,
139 ) -> Result<StorageKey> {
140 let (temporary_full, temporary_key_path) = self.resolve(temporary_key);
141 let (durable_full, durable_key_path) = self.resolve(durable_key);
142 self.reject_symlink_prefixes(&temporary_key_path)?;
143 self.reject_symlink_object(&temporary_full)?;
144 self.reject_symlink_prefixes(&durable_key_path)?;
145 if let Some(parent) = durable_full.parent() {
146 tokio::fs::create_dir_all(parent).await?;
147 }
148 self.reject_symlink_prefixes(&durable_key_path)?;
149 self.reject_symlink_object(&durable_full)?;
150
151 if !tokio::fs::try_exists(&temporary_full).await? {
152 return Err(StorageError::ObjectNotFound(temporary_key.to_string()));
153 }
154 if tokio::fs::try_exists(&durable_full).await? {
155 return Err(StorageError::ObjectConflict(durable_key.to_string()));
156 }
157 match tokio::fs::hard_link(&temporary_full, &durable_full).await {
158 Ok(()) => {}
159 Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
160 return Err(StorageError::ObjectConflict(durable_key.to_string()));
161 }
162 Err(error) => return Err(error.into()),
163 }
164 if let Err(error) = tokio::fs::remove_file(&temporary_full).await {
165 let cleanup_result = tokio::fs::remove_file(&durable_full).await;
166 if let Err(cleanup_error) = cleanup_result
167 && cleanup_error.kind() != std::io::ErrorKind::NotFound
168 {
169 return Err(cleanup_error.into());
170 }
171 return Err(error.into());
172 }
173 Ok(durable_key.clone())
174 }
175
176 async fn get(&self, key: &StorageKey, intent: StorageWriteIntent) -> Result<Vec<u8>> {
177 let (full, key_path) = self.resolve(key);
178 self.reject_symlink_prefixes(&key_path)?;
179 self.reject_symlink_object(&full)?;
180 let metadata = match tokio::fs::metadata(&full).await {
181 Ok(metadata) => metadata,
182 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
183 return Err(StorageError::ObjectNotFound(key.to_string()));
184 }
185 Err(error) => return Err(error.into()),
186 };
187 intent.ensure_len(metadata.len())?;
188 let bytes = tokio::fs::read(&full).await?;
189 let actual = u64::try_from(bytes.len()).map_err(|_| StorageError::ObjectTooLarge {
190 label: intent.label(),
191 actual: u64::MAX,
192 limit: intent.max_bytes(),
193 })?;
194 intent.ensure_len(actual)?;
195 if actual != metadata.len() {
196 return Err(StorageError::Internal(format!(
197 "local object length changed while reading {key}: expected {}, read {actual}",
198 metadata.len()
199 )));
200 }
201 Ok(bytes)
202 }
203
204 async fn get_to_file(
205 &self,
206 key: &StorageKey,
207 destination: &Path,
208 intent: StorageWriteIntent,
209 ) -> Result<()> {
210 let (full, key_path) = self.resolve(key);
211 self.reject_symlink_prefixes(&key_path)?;
212 self.reject_symlink_object(&full)?;
213 let metadata = match tokio::fs::metadata(&full).await {
214 Ok(metadata) => metadata,
215 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
216 return Err(StorageError::ObjectNotFound(key.to_string()));
217 }
218 Err(error) => return Err(error.into()),
219 };
220 intent.ensure_len(metadata.len())?;
221 if let Some(parent) = destination.parent() {
222 tokio::fs::create_dir_all(parent).await?;
223 }
224 let temporary =
225 destination.with_extension(format!("agentics-download-{}", uuid::Uuid::new_v4()));
226 let write_result = async {
227 if tokio::fs::try_exists(destination).await? {
228 return Err(StorageError::ObjectConflict(
229 destination.display().to_string(),
230 ));
231 }
232 let mut source = tokio::fs::File::open(&full).await?;
233 let mut file = create_private_file(&temporary).await?;
234 let copied = copy(&mut source, &mut file).await?;
235 if copied != metadata.len() {
236 return Err(StorageError::Internal(format!(
237 "local object length changed while downloading {key}: expected {}, copied {copied}",
238 metadata.len()
239 )));
240 }
241 file.flush().await?;
242 drop(file);
243 finalize_local_temp_without_overwrite(
244 &temporary,
245 destination,
246 &destination.display().to_string(),
247 )
248 .await?;
249 Ok::<(), StorageError>(())
250 }
251 .await;
252 cleanup_temp_file_on_error(write_result, &temporary).await
253 }
254
255 async fn exists(&self, key: &StorageKey) -> Result<bool> {
256 let (full, key_path) = self.resolve(key);
257 self.reject_symlink_prefixes(&key_path)?;
258 if let Ok(metadata) = tokio::fs::symlink_metadata(&full).await {
259 return Ok(!metadata.file_type().is_symlink());
260 }
261 Ok(false)
262 }
263
264 async fn delete(&self, key: &StorageKey) -> Result<()> {
265 let (full, key_path) = self.resolve(key);
266 self.reject_symlink_prefixes(&key_path)?;
267 self.reject_symlink_object(&full)?;
268 match tokio::fs::remove_file(&full).await {
269 Ok(()) => {}
270 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
271 Err(e) => return Err(e.into()),
272 }
273 Ok(())
274 }
275
276 async fn list_prefix(&self, prefix: &StorageKey) -> Result<Vec<StorageKey>> {
277 let root = self.root.clone();
278 let prefix = prefix.as_str().to_string();
279 tokio::task::spawn_blocking(move || list_local_prefix(&root, &prefix))
280 .await
281 .map_err(|e| StorageError::Internal(e.to_string()))?
282 }
283
284 async fn delete_prefix_older_than(
285 &self,
286 prefix: &StorageKey,
287 older_than: SystemTime,
288 ) -> Result<u64> {
289 let root = self.root.clone();
290 let prefix = prefix.as_str().to_string();
291 tokio::task::spawn_blocking(move || {
292 delete_local_prefix_older_than(&root, &prefix, older_than)
293 })
294 .await
295 .map_err(|e| StorageError::Internal(e.to_string()))?
296 }
297}
298
299async fn put_local_bytes(
300 storage: &LocalStorage,
301 key: &StorageKey,
302 full: &Path,
303 key_path: &Path,
304 content: &[u8],
305) -> Result<()> {
306 storage.reject_symlink_prefixes(key_path)?;
307 let parent = full
308 .parent()
309 .ok_or_else(|| StorageError::Internal("storage key has no parent".to_string()))?;
310 tokio::fs::create_dir_all(parent).await?;
311 storage.reject_symlink_prefixes(key_path)?;
312 storage.reject_symlink_object(full)?;
313 if tokio::fs::try_exists(full).await? {
314 return Err(StorageError::ObjectConflict(key.to_string()));
315 }
316 let temporary_full = parent.join(format!(".agentics-write-{}", uuid::Uuid::new_v4()));
317 let write_result = async {
318 write_private_file(&temporary_full, content).await?;
319 storage.reject_symlink_prefixes(key_path)?;
320 storage.reject_symlink_object(full)?;
321 finalize_local_temp_without_overwrite(&temporary_full, full, key.as_str()).await?;
322 Ok::<(), StorageError>(())
323 }
324 .await;
325 cleanup_temp_file_on_error(write_result, &temporary_full).await
326}
327
328fn list_local_prefix(root: &Path, prefix: &str) -> Result<Vec<StorageKey>> {
329 let start = root.join(prefix);
330 if !start.exists() {
331 return Ok(Vec::new());
332 }
333 let mut keys = Vec::new();
334 let mut stack = vec![start];
335 while let Some(path) = stack.pop() {
336 let metadata = fs::symlink_metadata(&path)?;
337 if metadata.file_type().is_symlink() {
338 continue;
339 }
340 if metadata.is_dir() {
341 for entry in fs::read_dir(&path)? {
342 stack.push(entry?.path());
343 }
344 } else if metadata.is_file() {
345 let relative = path
346 .strip_prefix(root)
347 .map_err(|e| StorageError::Internal(e.to_string()))?;
348 let key = relative
349 .to_str()
350 .ok_or_else(|| StorageError::InvalidKey("storage key is not UTF-8".to_string()))?
351 .replace('\\', "/");
352 keys.push(
353 StorageKey::try_new(&key).map_err(|e| StorageError::InvalidKey(e.to_string()))?,
354 );
355 }
356 }
357 keys.sort();
358 Ok(keys)
359}
360
361fn delete_local_prefix_older_than(
362 root: &Path,
363 prefix: &str,
364 older_than: SystemTime,
365) -> Result<u64> {
366 let keys = list_local_prefix(root, prefix)?;
367 let mut deleted = 0u64;
368 for key in keys {
369 let path = root.join(key.as_path());
370 let metadata = match fs::symlink_metadata(&path) {
371 Ok(metadata) => metadata,
372 Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
373 Err(error) => return Err(error.into()),
374 };
375 if metadata.file_type().is_symlink() || !metadata.is_file() {
376 continue;
377 }
378 let modified = metadata.modified()?;
379 if modified < older_than {
380 fs::remove_file(&path)?;
381 deleted = deleted.checked_add(1).ok_or_else(|| {
382 StorageError::Internal("deleted object count overflow".to_string())
383 })?;
384 }
385 }
386 Ok(deleted)
387}
388
389fn invalid_storage_key() -> StorageError {
390 StorageError::InvalidKey(
391 "storage key must be a non-empty relative path with safe ASCII components and no `.` or `..` components".to_string(),
392 )
393}