Skip to main content

floe_core/io/storage/providers/
local.rs

1use std::path::{Path, PathBuf};
2
3use glob::glob;
4
5use crate::errors::{RunError, StorageError};
6use crate::{config, ConfigError, FloeResult};
7
8use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
9
10pub struct LocalClient;
11
12impl LocalClient {
13    pub fn new() -> Self {
14        Self
15    }
16}
17
18impl Default for LocalClient {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl StorageClient for LocalClient {
25    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
26        let path = Path::new(prefix);
27        if path.is_file() {
28            let uri = self.resolve_uri(prefix)?;
29            return Ok(vec![ObjectRef {
30                uri,
31                key: prefix.to_string(),
32                last_modified: None,
33                size: None,
34            }]);
35        }
36        if !path.exists() {
37            return Ok(Vec::new());
38        }
39        let mut refs = Vec::new();
40        for entry in std::fs::read_dir(path)? {
41            let entry = entry?;
42            let path = entry.path();
43            if path.is_file() {
44                let key = path.display().to_string();
45                let uri = self.resolve_uri(&key)?;
46                refs.push(ObjectRef {
47                    uri,
48                    key,
49                    last_modified: None,
50                    size: None,
51                });
52            }
53        }
54        refs = planner::stable_sort_refs(refs);
55        Ok(refs)
56    }
57
58    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
59        let src = PathBuf::from(uri.trim_start_matches("local://"));
60        let dest = temp_dir.join(
61            src.file_name()
62                .and_then(|name| name.to_str())
63                .unwrap_or("object"),
64        );
65        planner::ensure_parent_dir(&dest)?;
66        std::fs::copy(&src, &dest).map_err(|err| {
67            Box::new(StorageError(format!(
68                "local download failed from {}: {err}",
69                src.display()
70            ))) as Box<dyn std::error::Error + Send + Sync>
71        })?;
72        Ok(dest)
73    }
74
75    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
76        let dest = PathBuf::from(uri.trim_start_matches("local://"));
77        planner::ensure_parent_dir(&dest)?;
78        std::fs::copy(local_path, &dest).map_err(|err| {
79            Box::new(StorageError(format!(
80                "local upload failed to {}: {err}",
81                dest.display()
82            ))) as Box<dyn std::error::Error + Send + Sync>
83        })?;
84        Ok(())
85    }
86
87    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
88        let path = Path::new(path);
89        let normalized = if path.is_absolute() {
90            crate::io::storage::paths::normalize_local_path(path)
91        } else {
92            let abs = std::env::current_dir()?.join(path);
93            crate::io::storage::paths::normalize_local_path(&abs)
94        };
95        Ok(format!("local://{}", normalized.display()))
96    }
97
98    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
99        let src = Path::new(src_uri.trim_start_matches("local://"));
100        let dst = Path::new(dst_uri.trim_start_matches("local://"));
101        planner::ensure_parent_dir(dst)?;
102        std::fs::copy(src, dst).map_err(|err| {
103            Box::new(StorageError(format!(
104                "local copy failed from {} to {}: {err}",
105                src.display(),
106                dst.display()
107            ))) as Box<dyn std::error::Error + Send + Sync>
108        })?;
109        Ok(())
110    }
111
112    fn delete_object(&self, uri: &str) -> FloeResult<()> {
113        let path = Path::new(uri.trim_start_matches("local://"));
114        if path.exists() {
115            std::fs::remove_file(path).map_err(|err| {
116                Box::new(StorageError(format!(
117                    "local delete failed for {}: {err}",
118                    path.display()
119                ))) as Box<dyn std::error::Error + Send + Sync>
120            })?;
121        }
122        Ok(())
123    }
124
125    fn exists(&self, uri: &str) -> FloeResult<bool> {
126        let path = Path::new(uri.trim_start_matches("local://"));
127        Ok(path.exists())
128    }
129
130    fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
131        let path = Path::new(uri.trim_start_matches("local://"));
132        if !path.exists() {
133            return Ok(None);
134        }
135        let _lock = FileLock::acquire(path)?;
136        if !path.exists() {
137            return Ok(None);
138        }
139        Ok(Some(StoredObject {
140            body: std::fs::read(path)?,
141            version: local_version(path)?,
142        }))
143    }
144
145    fn write_object_conditional(
146        &self,
147        uri: &str,
148        expected_version: Option<&str>,
149        body: &[u8],
150    ) -> FloeResult<ConditionalWrite> {
151        let path = PathBuf::from(uri.trim_start_matches("local://"));
152        planner::ensure_parent_dir(&path)?;
153        let _lock = FileLock::acquire(&path)?;
154        let current = if path.exists() {
155            Some(local_version(&path)?)
156        } else {
157            None
158        };
159        if current.as_deref() != expected_version {
160            return Ok(ConditionalWrite::Conflict);
161        }
162        std::fs::write(&path, body)?;
163        Ok(ConditionalWrite::Written {
164            version: local_version(&path)?,
165        })
166    }
167
168    fn delete_object_conditional(
169        &self,
170        uri: &str,
171        expected_version: Option<&str>,
172    ) -> FloeResult<ConditionalWrite> {
173        let path = PathBuf::from(uri.trim_start_matches("local://"));
174        planner::ensure_parent_dir(&path)?;
175        let _lock = FileLock::acquire(&path)?;
176        let current = if path.exists() {
177            Some(local_version(&path)?)
178        } else {
179            None
180        };
181        if current.as_deref() != expected_version {
182            return Ok(ConditionalWrite::Conflict);
183        }
184        if path.exists() {
185            std::fs::remove_file(&path)?;
186        }
187        Ok(ConditionalWrite::Written {
188            version: "deleted".to_string(),
189        })
190    }
191}
192
193struct FileLock {
194    path: PathBuf,
195}
196
197impl FileLock {
198    fn acquire(base: &Path) -> FloeResult<Self> {
199        let lock_path = PathBuf::from(format!("{}.lock", base.display()));
200        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
201        loop {
202            match std::fs::OpenOptions::new()
203                .write(true)
204                .create_new(true)
205                .open(&lock_path)
206            {
207                Ok(_) => return Ok(Self { path: lock_path }),
208                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
209                    if std::time::Instant::now() >= deadline {
210                        return Err(format!(
211                            "timed out acquiring local state lock {}",
212                            lock_path.display()
213                        )
214                        .into());
215                    }
216                    std::thread::sleep(std::time::Duration::from_millis(10));
217                }
218                Err(e) => return Err(Box::new(e)),
219            }
220        }
221    }
222}
223
224impl Drop for FileLock {
225    fn drop(&mut self) {
226        let _ = std::fs::remove_file(&self.path);
227    }
228}
229
230fn local_version(path: &Path) -> FloeResult<String> {
231    let metadata = std::fs::metadata(path)?;
232    let modified = metadata
233        .modified()?
234        .duration_since(std::time::UNIX_EPOCH)
235        .unwrap_or_default()
236        .as_nanos();
237    Ok(format!("{}:{modified}", metadata.len()))
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum LocalInputMode {
242    File,
243    Directory,
244}
245
246#[derive(Debug, Clone)]
247pub struct ResolvedLocalInputs {
248    pub files: Vec<PathBuf>,
249    pub mode: LocalInputMode,
250}
251
252pub fn resolve_local_inputs(
253    config_dir: &Path,
254    entity_name: &str,
255    source: &config::SourceConfig,
256    storage: &str,
257    default_globs: &[String],
258) -> FloeResult<ResolvedLocalInputs> {
259    let default_options = config::SourceOptions::default();
260    let options = source.options.as_ref().unwrap_or(&default_options);
261    let recursive = options.recursive.unwrap_or(false);
262    let glob_override = options.glob.as_deref();
263    let raw_path = source.path.as_str();
264
265    if is_glob_pattern(raw_path) {
266        let pattern_path = resolve_glob_pattern(config_dir, raw_path);
267        let pattern = pattern_path.to_string_lossy().to_string();
268        let files = collect_glob_files(&pattern)?;
269        if files.is_empty() {
270            let (base_path, glob_used) = split_glob_details(&pattern_path, raw_path);
271            return Err(Box::new(RunError(no_match_message(
272                entity_name,
273                storage,
274                &base_path,
275                &glob_used,
276                recursive,
277            ))));
278        }
279        return Ok(ResolvedLocalInputs {
280            files,
281            mode: LocalInputMode::Directory,
282        });
283    }
284
285    let base_path = config::resolve_local_path(config_dir, raw_path);
286    if base_path.is_file() {
287        return Ok(ResolvedLocalInputs {
288            files: vec![base_path],
289            mode: LocalInputMode::File,
290        });
291    }
292
293    let glob_used = if let Some(glob_override) = glob_override {
294        vec![glob_override.to_string()]
295    } else {
296        default_globs.to_vec()
297    };
298    if !base_path.is_dir() {
299        return Err(Box::new(RunError(no_match_message(
300            entity_name,
301            storage,
302            &base_path.display().to_string(),
303            &glob_used.join(","),
304            recursive,
305        ))));
306    }
307
308    let pattern_paths = if recursive {
309        glob_used
310            .iter()
311            .map(|glob| base_path.join("**").join(glob))
312            .collect::<Vec<_>>()
313    } else {
314        glob_used
315            .iter()
316            .map(|glob| base_path.join(glob))
317            .collect::<Vec<_>>()
318    };
319    let files = collect_glob_files_multi(&pattern_paths)?;
320    if files.is_empty() {
321        return Err(Box::new(RunError(no_match_message(
322            entity_name,
323            storage,
324            &base_path.display().to_string(),
325            &glob_used.join(","),
326            recursive,
327        ))));
328    }
329
330    Ok(ResolvedLocalInputs {
331        files,
332        mode: LocalInputMode::Directory,
333    })
334}
335
336fn is_glob_pattern(value: &str) -> bool {
337    value.contains('*') || value.contains('?') || value.contains('[')
338}
339
340fn resolve_glob_pattern(config_dir: &Path, raw_path: &str) -> PathBuf {
341    let path = Path::new(raw_path);
342    if path.is_absolute() {
343        path.to_path_buf()
344    } else {
345        config_dir.join(raw_path)
346    }
347}
348
349fn split_glob_details(pattern_path: &Path, raw_pattern: &str) -> (String, String) {
350    let base = pattern_path
351        .parent()
352        .unwrap_or(pattern_path)
353        .display()
354        .to_string();
355    let glob_used = pattern_path
356        .file_name()
357        .map(|name| name.to_string_lossy().to_string())
358        .unwrap_or_else(|| raw_pattern.to_string());
359    (base, glob_used)
360}
361
362fn collect_glob_files(pattern: &str) -> FloeResult<Vec<PathBuf>> {
363    let mut files = Vec::new();
364    let entries = glob(pattern).map_err(|err| {
365        Box::new(ConfigError(format!(
366            "invalid glob pattern {pattern:?}: {err}"
367        ))) as Box<dyn std::error::Error + Send + Sync>
368    })?;
369    for entry in entries {
370        let path = entry.map_err(|err| {
371            Box::new(ConfigError(format!(
372                "glob match failed for {pattern:?}: {err}"
373            ))) as Box<dyn std::error::Error + Send + Sync>
374        })?;
375        if path.is_file() {
376            files.push(crate::io::storage::paths::normalize_local_path(&path));
377        }
378    }
379    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
380    Ok(files)
381}
382
383fn collect_glob_files_multi(patterns: &[PathBuf]) -> FloeResult<Vec<PathBuf>> {
384    let mut files = Vec::new();
385    for pattern_path in patterns {
386        let pattern = pattern_path.to_string_lossy().to_string();
387        files.extend(collect_glob_files(&pattern)?);
388    }
389    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
390    files.dedup_by(|a, b| a.to_string_lossy() == b.to_string_lossy());
391    Ok(files)
392}
393
394fn no_match_message(
395    entity_name: &str,
396    storage: &str,
397    base_path: &str,
398    glob_used: &str,
399    recursive: bool,
400) -> String {
401    format!(
402        "entity.name={} source.storage={} no input files matched (base_path={}, glob={}, recursive={})",
403        entity_name, storage, base_path, glob_used, recursive
404    )
405}