Skip to main content

agentics_storage/
local.rs

1use std::fs;
2use std::path::{Component, Path, PathBuf};
3use std::time::SystemTime;
4
5use async_trait::async_trait;
6use tokio::io::{AsyncWriteExt as _, copy};
7
8use crate::fs_utils::{
9    cleanup_temp_file_on_error, create_private_file, finalize_local_temp_without_overwrite,
10    write_private_file,
11};
12use crate::{Result, Storage, StorageError, StorageKey, StorageWriteIntent};
13
14/// Filesystem-backed storage rooted at a configured directory.
15#[derive(Debug, Clone)]
16pub struct LocalStorage {
17    root: PathBuf,
18}
19
20/// Local filesystem durable storage settings.
21#[derive(Debug, Clone)]
22pub struct LocalStorageOptions {
23    pub root: PathBuf,
24}
25
26impl LocalStorage {
27    /// Create local storage rooted at `root`.
28    pub fn new(root: impl AsRef<Path>) -> Self {
29        Self {
30            root: root.as_ref().to_path_buf(),
31        }
32    }
33
34    /// Create local storage from explicit options.
35    pub fn from_options(options: LocalStorageOptions) -> Self {
36        Self::new(options.root)
37    }
38
39    fn resolve(&self, key: &StorageKey) -> (PathBuf, PathBuf) {
40        let key_path = key.as_path().to_path_buf();
41        (self.root.join(&key_path), key_path)
42    }
43
44    fn reject_symlink_prefixes(&self, key: &Path) -> Result<()> {
45        let Some(parent) = key.parent() else {
46            return Ok(());
47        };
48        let mut current = self.root.clone();
49        for component in parent.components() {
50            let Component::Normal(part) = component else {
51                return Err(invalid_storage_key());
52            };
53            current.push(part);
54            if let Ok(metadata) = fs::symlink_metadata(&current)
55                && metadata.file_type().is_symlink()
56            {
57                return Err(StorageError::SymlinkRejected(format!(
58                    "storage key resolves through a symlink: {}",
59                    current.display()
60                )));
61            }
62        }
63        Ok(())
64    }
65
66    fn reject_symlink_object(&self, full: &Path) -> Result<()> {
67        if let Ok(metadata) = fs::symlink_metadata(full)
68            && metadata.file_type().is_symlink()
69        {
70            return Err(StorageError::SymlinkRejected(format!(
71                "storage key resolves to a symlink: {}",
72                full.display()
73            )));
74        }
75        Ok(())
76    }
77}
78
79#[async_trait]
80impl Storage for LocalStorage {
81    async fn put(
82        &self,
83        key: &StorageKey,
84        content: &[u8],
85        intent: StorageWriteIntent,
86    ) -> Result<StorageKey> {
87        let len = u64::try_from(content.len()).map_err(|_| StorageError::ObjectTooLarge {
88            label: intent.label(),
89            actual: u64::MAX,
90            limit: intent.max_bytes(),
91        })?;
92        intent.ensure_len(len)?;
93        let (full, key_path) = self.resolve(key);
94        put_local_bytes(self, key, &full, &key_path, content).await?;
95        Ok(key.clone())
96    }
97
98    async fn put_file(
99        &self,
100        key: &StorageKey,
101        source: &Path,
102        intent: StorageWriteIntent,
103    ) -> Result<StorageKey> {
104        let len = tokio::fs::metadata(source).await?.len();
105        intent.ensure_len(len)?;
106        let (full, key_path) = self.resolve(key);
107        self.reject_symlink_prefixes(&key_path)?;
108        let parent = full
109            .parent()
110            .ok_or_else(|| StorageError::Internal("storage key has no parent".to_string()))?;
111        tokio::fs::create_dir_all(parent).await?;
112        self.reject_symlink_prefixes(&key_path)?;
113        self.reject_symlink_object(&full)?;
114        if tokio::fs::try_exists(&full).await? {
115            return Err(StorageError::ObjectConflict(key.to_string()));
116        }
117        let temporary_full = parent.join(format!(".agentics-write-{}", uuid::Uuid::new_v4()));
118        let copy_result = async {
119            let copied = tokio::fs::copy(source, &temporary_full).await?;
120            if copied != len {
121                return Err(StorageError::Internal(format!(
122                    "local source file length changed while storing {key}: expected {len}, copied {copied}"
123                )));
124            }
125            self.reject_symlink_prefixes(&key_path)?;
126            self.reject_symlink_object(&full)?;
127            finalize_local_temp_without_overwrite(&temporary_full, &full, key.as_str()).await?;
128            Ok::<(), StorageError>(())
129        }
130        .await;
131        cleanup_temp_file_on_error(copy_result, &temporary_full).await?;
132        Ok(key.clone())
133    }
134
135    async fn promote(
136        &self,
137        temporary_key: &StorageKey,
138        durable_key: &StorageKey,
139    ) -> Result<StorageKey> {
140        let (temporary_full, temporary_key_path) = self.resolve(temporary_key);
141        let (durable_full, durable_key_path) = self.resolve(durable_key);
142        self.reject_symlink_prefixes(&temporary_key_path)?;
143        self.reject_symlink_object(&temporary_full)?;
144        self.reject_symlink_prefixes(&durable_key_path)?;
145        if let Some(parent) = durable_full.parent() {
146            tokio::fs::create_dir_all(parent).await?;
147        }
148        self.reject_symlink_prefixes(&durable_key_path)?;
149        self.reject_symlink_object(&durable_full)?;
150
151        if !tokio::fs::try_exists(&temporary_full).await? {
152            return Err(StorageError::ObjectNotFound(temporary_key.to_string()));
153        }
154        if tokio::fs::try_exists(&durable_full).await? {
155            return Err(StorageError::ObjectConflict(durable_key.to_string()));
156        }
157        match tokio::fs::hard_link(&temporary_full, &durable_full).await {
158            Ok(()) => {}
159            Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
160                return Err(StorageError::ObjectConflict(durable_key.to_string()));
161            }
162            Err(error) => return Err(error.into()),
163        }
164        if let Err(error) = tokio::fs::remove_file(&temporary_full).await {
165            let cleanup_result = tokio::fs::remove_file(&durable_full).await;
166            if let Err(cleanup_error) = cleanup_result
167                && cleanup_error.kind() != std::io::ErrorKind::NotFound
168            {
169                return Err(cleanup_error.into());
170            }
171            return Err(error.into());
172        }
173        Ok(durable_key.clone())
174    }
175
176    async fn get(&self, key: &StorageKey, intent: StorageWriteIntent) -> Result<Vec<u8>> {
177        let (full, key_path) = self.resolve(key);
178        self.reject_symlink_prefixes(&key_path)?;
179        self.reject_symlink_object(&full)?;
180        let metadata = match tokio::fs::metadata(&full).await {
181            Ok(metadata) => metadata,
182            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
183                return Err(StorageError::ObjectNotFound(key.to_string()));
184            }
185            Err(error) => return Err(error.into()),
186        };
187        intent.ensure_len(metadata.len())?;
188        let bytes = tokio::fs::read(&full).await?;
189        let actual = u64::try_from(bytes.len()).map_err(|_| StorageError::ObjectTooLarge {
190            label: intent.label(),
191            actual: u64::MAX,
192            limit: intent.max_bytes(),
193        })?;
194        intent.ensure_len(actual)?;
195        if actual != metadata.len() {
196            return Err(StorageError::Internal(format!(
197                "local object length changed while reading {key}: expected {}, read {actual}",
198                metadata.len()
199            )));
200        }
201        Ok(bytes)
202    }
203
204    async fn get_to_file(
205        &self,
206        key: &StorageKey,
207        destination: &Path,
208        intent: StorageWriteIntent,
209    ) -> Result<()> {
210        let (full, key_path) = self.resolve(key);
211        self.reject_symlink_prefixes(&key_path)?;
212        self.reject_symlink_object(&full)?;
213        let metadata = match tokio::fs::metadata(&full).await {
214            Ok(metadata) => metadata,
215            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
216                return Err(StorageError::ObjectNotFound(key.to_string()));
217            }
218            Err(error) => return Err(error.into()),
219        };
220        intent.ensure_len(metadata.len())?;
221        if let Some(parent) = destination.parent() {
222            tokio::fs::create_dir_all(parent).await?;
223        }
224        let temporary =
225            destination.with_extension(format!("agentics-download-{}", uuid::Uuid::new_v4()));
226        let write_result = async {
227            if tokio::fs::try_exists(destination).await? {
228                return Err(StorageError::ObjectConflict(
229                    destination.display().to_string(),
230                ));
231            }
232            let mut source = tokio::fs::File::open(&full).await?;
233            let mut file = create_private_file(&temporary).await?;
234            let copied = copy(&mut source, &mut file).await?;
235            if copied != metadata.len() {
236                return Err(StorageError::Internal(format!(
237                    "local object length changed while downloading {key}: expected {}, copied {copied}",
238                    metadata.len()
239                )));
240            }
241            file.flush().await?;
242            drop(file);
243            finalize_local_temp_without_overwrite(
244                &temporary,
245                destination,
246                &destination.display().to_string(),
247            )
248            .await?;
249            Ok::<(), StorageError>(())
250        }
251        .await;
252        cleanup_temp_file_on_error(write_result, &temporary).await
253    }
254
255    async fn exists(&self, key: &StorageKey) -> Result<bool> {
256        let (full, key_path) = self.resolve(key);
257        self.reject_symlink_prefixes(&key_path)?;
258        if let Ok(metadata) = tokio::fs::symlink_metadata(&full).await {
259            return Ok(!metadata.file_type().is_symlink());
260        }
261        Ok(false)
262    }
263
264    async fn delete(&self, key: &StorageKey) -> Result<()> {
265        let (full, key_path) = self.resolve(key);
266        self.reject_symlink_prefixes(&key_path)?;
267        self.reject_symlink_object(&full)?;
268        match tokio::fs::remove_file(&full).await {
269            Ok(()) => {}
270            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
271            Err(e) => return Err(e.into()),
272        }
273        Ok(())
274    }
275
276    async fn list_prefix(&self, prefix: &StorageKey) -> Result<Vec<StorageKey>> {
277        let root = self.root.clone();
278        let prefix = prefix.as_str().to_string();
279        tokio::task::spawn_blocking(move || list_local_prefix(&root, &prefix))
280            .await
281            .map_err(|e| StorageError::Internal(e.to_string()))?
282    }
283
284    async fn delete_prefix_older_than(
285        &self,
286        prefix: &StorageKey,
287        older_than: SystemTime,
288    ) -> Result<u64> {
289        let root = self.root.clone();
290        let prefix = prefix.as_str().to_string();
291        tokio::task::spawn_blocking(move || {
292            delete_local_prefix_older_than(&root, &prefix, older_than)
293        })
294        .await
295        .map_err(|e| StorageError::Internal(e.to_string()))?
296    }
297}
298
299async fn put_local_bytes(
300    storage: &LocalStorage,
301    key: &StorageKey,
302    full: &Path,
303    key_path: &Path,
304    content: &[u8],
305) -> Result<()> {
306    storage.reject_symlink_prefixes(key_path)?;
307    let parent = full
308        .parent()
309        .ok_or_else(|| StorageError::Internal("storage key has no parent".to_string()))?;
310    tokio::fs::create_dir_all(parent).await?;
311    storage.reject_symlink_prefixes(key_path)?;
312    storage.reject_symlink_object(full)?;
313    if tokio::fs::try_exists(full).await? {
314        return Err(StorageError::ObjectConflict(key.to_string()));
315    }
316    let temporary_full = parent.join(format!(".agentics-write-{}", uuid::Uuid::new_v4()));
317    let write_result = async {
318        write_private_file(&temporary_full, content).await?;
319        storage.reject_symlink_prefixes(key_path)?;
320        storage.reject_symlink_object(full)?;
321        finalize_local_temp_without_overwrite(&temporary_full, full, key.as_str()).await?;
322        Ok::<(), StorageError>(())
323    }
324    .await;
325    cleanup_temp_file_on_error(write_result, &temporary_full).await
326}
327
328fn list_local_prefix(root: &Path, prefix: &str) -> Result<Vec<StorageKey>> {
329    let start = root.join(prefix);
330    if !start.exists() {
331        return Ok(Vec::new());
332    }
333    let mut keys = Vec::new();
334    let mut stack = vec![start];
335    while let Some(path) = stack.pop() {
336        let metadata = fs::symlink_metadata(&path)?;
337        if metadata.file_type().is_symlink() {
338            continue;
339        }
340        if metadata.is_dir() {
341            for entry in fs::read_dir(&path)? {
342                stack.push(entry?.path());
343            }
344        } else if metadata.is_file() {
345            let relative = path
346                .strip_prefix(root)
347                .map_err(|e| StorageError::Internal(e.to_string()))?;
348            let key = relative
349                .to_str()
350                .ok_or_else(|| StorageError::InvalidKey("storage key is not UTF-8".to_string()))?
351                .replace('\\', "/");
352            keys.push(
353                StorageKey::try_new(&key).map_err(|e| StorageError::InvalidKey(e.to_string()))?,
354            );
355        }
356    }
357    keys.sort();
358    Ok(keys)
359}
360
361fn delete_local_prefix_older_than(
362    root: &Path,
363    prefix: &str,
364    older_than: SystemTime,
365) -> Result<u64> {
366    let keys = list_local_prefix(root, prefix)?;
367    let mut deleted = 0u64;
368    for key in keys {
369        let path = root.join(key.as_path());
370        let metadata = match fs::symlink_metadata(&path) {
371            Ok(metadata) => metadata,
372            Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
373            Err(error) => return Err(error.into()),
374        };
375        if metadata.file_type().is_symlink() || !metadata.is_file() {
376            continue;
377        }
378        let modified = metadata.modified()?;
379        if modified < older_than {
380            fs::remove_file(&path)?;
381            deleted = deleted.checked_add(1).ok_or_else(|| {
382                StorageError::Internal("deleted object count overflow".to_string())
383            })?;
384        }
385    }
386    Ok(deleted)
387}
388
389fn invalid_storage_key() -> StorageError {
390    StorageError::InvalidKey(
391        "storage key must be a non-empty relative path with safe ASCII components and no `.` or `..` components".to_string(),
392    )
393}