Skip to main content

floe_core/io/storage/
local.rs

1use std::path::{Path, PathBuf};
2
3use glob::glob;
4
5use crate::errors::{RunError, StorageError};
6use crate::{config, ConfigError, FloeResult};
7
8use super::{planner, ObjectRef, StorageClient};
9
10pub struct LocalClient;
11
12impl LocalClient {
13    pub fn new() -> Self {
14        Self
15    }
16}
17
18impl Default for LocalClient {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl StorageClient for LocalClient {
25    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
26        let path = Path::new(prefix);
27        if path.is_file() {
28            let uri = self.resolve_uri(prefix)?;
29            return Ok(vec![ObjectRef {
30                uri,
31                key: prefix.to_string(),
32                last_modified: None,
33                size: None,
34            }]);
35        }
36        if !path.exists() {
37            return Ok(Vec::new());
38        }
39        let mut refs = Vec::new();
40        for entry in std::fs::read_dir(path)? {
41            let entry = entry?;
42            let path = entry.path();
43            if path.is_file() {
44                let key = path.display().to_string();
45                let uri = self.resolve_uri(&key)?;
46                refs.push(ObjectRef {
47                    uri,
48                    key,
49                    last_modified: None,
50                    size: None,
51                });
52            }
53        }
54        refs = planner::stable_sort_refs(refs);
55        Ok(refs)
56    }
57
58    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
59        let src = PathBuf::from(uri.trim_start_matches("local://"));
60        let dest = temp_dir.join(
61            src.file_name()
62                .and_then(|name| name.to_str())
63                .unwrap_or("object"),
64        );
65        planner::ensure_parent_dir(&dest)?;
66        std::fs::copy(&src, &dest).map_err(|err| {
67            Box::new(StorageError(format!(
68                "local download failed from {}: {err}",
69                src.display()
70            ))) as Box<dyn std::error::Error + Send + Sync>
71        })?;
72        Ok(dest)
73    }
74
75    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
76        let dest = PathBuf::from(uri.trim_start_matches("local://"));
77        planner::ensure_parent_dir(&dest)?;
78        std::fs::copy(local_path, &dest).map_err(|err| {
79            Box::new(StorageError(format!(
80                "local upload failed to {}: {err}",
81                dest.display()
82            ))) as Box<dyn std::error::Error + Send + Sync>
83        })?;
84        Ok(())
85    }
86
87    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
88        let path = Path::new(path);
89        if path.is_absolute() {
90            Ok(format!("local://{}", path.display()))
91        } else {
92            let abs = std::env::current_dir()?.join(path);
93            Ok(format!("local://{}", abs.display()))
94        }
95    }
96
97    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
98        let src = Path::new(src_uri.trim_start_matches("local://"));
99        let dst = Path::new(dst_uri.trim_start_matches("local://"));
100        planner::ensure_parent_dir(dst)?;
101        std::fs::copy(src, dst).map_err(|err| {
102            Box::new(StorageError(format!(
103                "local copy failed from {} to {}: {err}",
104                src.display(),
105                dst.display()
106            ))) as Box<dyn std::error::Error + Send + Sync>
107        })?;
108        Ok(())
109    }
110
111    fn delete_object(&self, uri: &str) -> FloeResult<()> {
112        let path = Path::new(uri.trim_start_matches("local://"));
113        if path.exists() {
114            std::fs::remove_file(path).map_err(|err| {
115                Box::new(StorageError(format!(
116                    "local delete failed for {}: {err}",
117                    path.display()
118                ))) as Box<dyn std::error::Error + Send + Sync>
119            })?;
120        }
121        Ok(())
122    }
123
124    fn exists(&self, uri: &str) -> FloeResult<bool> {
125        let path = Path::new(uri.trim_start_matches("local://"));
126        Ok(path.exists())
127    }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum LocalInputMode {
132    File,
133    Directory,
134}
135
136#[derive(Debug, Clone)]
137pub struct ResolvedLocalInputs {
138    pub files: Vec<PathBuf>,
139    pub mode: LocalInputMode,
140}
141
142pub fn resolve_local_inputs(
143    config_dir: &Path,
144    entity_name: &str,
145    source: &config::SourceConfig,
146    storage: &str,
147    default_globs: &[String],
148) -> FloeResult<ResolvedLocalInputs> {
149    let default_options = config::SourceOptions::default();
150    let options = source.options.as_ref().unwrap_or(&default_options);
151    let recursive = options.recursive.unwrap_or(false);
152    let glob_override = options.glob.as_deref();
153    let raw_path = source.path.as_str();
154
155    if is_glob_pattern(raw_path) {
156        let pattern_path = resolve_glob_pattern(config_dir, raw_path);
157        let pattern = pattern_path.to_string_lossy().to_string();
158        let files = collect_glob_files(&pattern)?;
159        if files.is_empty() {
160            let (base_path, glob_used) = split_glob_details(&pattern_path, raw_path);
161            return Err(Box::new(RunError(no_match_message(
162                entity_name,
163                storage,
164                &base_path,
165                &glob_used,
166                recursive,
167            ))));
168        }
169        return Ok(ResolvedLocalInputs {
170            files,
171            mode: LocalInputMode::Directory,
172        });
173    }
174
175    let base_path = config::resolve_local_path(config_dir, raw_path);
176    if base_path.is_file() {
177        return Ok(ResolvedLocalInputs {
178            files: vec![base_path],
179            mode: LocalInputMode::File,
180        });
181    }
182
183    let glob_used = if let Some(glob_override) = glob_override {
184        vec![glob_override.to_string()]
185    } else {
186        default_globs.to_vec()
187    };
188    if !base_path.is_dir() {
189        return Err(Box::new(RunError(no_match_message(
190            entity_name,
191            storage,
192            &base_path.display().to_string(),
193            &glob_used.join(","),
194            recursive,
195        ))));
196    }
197
198    let pattern_paths = if recursive {
199        glob_used
200            .iter()
201            .map(|glob| base_path.join("**").join(glob))
202            .collect::<Vec<_>>()
203    } else {
204        glob_used
205            .iter()
206            .map(|glob| base_path.join(glob))
207            .collect::<Vec<_>>()
208    };
209    let files = collect_glob_files_multi(&pattern_paths)?;
210    if files.is_empty() {
211        return Err(Box::new(RunError(no_match_message(
212            entity_name,
213            storage,
214            &base_path.display().to_string(),
215            &glob_used.join(","),
216            recursive,
217        ))));
218    }
219
220    Ok(ResolvedLocalInputs {
221        files,
222        mode: LocalInputMode::Directory,
223    })
224}
225
226fn is_glob_pattern(value: &str) -> bool {
227    value.contains('*') || value.contains('?') || value.contains('[')
228}
229
230fn resolve_glob_pattern(config_dir: &Path, raw_path: &str) -> PathBuf {
231    let path = Path::new(raw_path);
232    if path.is_absolute() {
233        path.to_path_buf()
234    } else {
235        config_dir.join(raw_path)
236    }
237}
238
239fn split_glob_details(pattern_path: &Path, raw_pattern: &str) -> (String, String) {
240    let base = pattern_path
241        .parent()
242        .unwrap_or(pattern_path)
243        .display()
244        .to_string();
245    let glob_used = pattern_path
246        .file_name()
247        .map(|name| name.to_string_lossy().to_string())
248        .unwrap_or_else(|| raw_pattern.to_string());
249    (base, glob_used)
250}
251
252fn collect_glob_files(pattern: &str) -> FloeResult<Vec<PathBuf>> {
253    let mut files = Vec::new();
254    let entries = glob(pattern).map_err(|err| {
255        Box::new(ConfigError(format!(
256            "invalid glob pattern {pattern:?}: {err}"
257        ))) as Box<dyn std::error::Error + Send + Sync>
258    })?;
259    for entry in entries {
260        let path = entry.map_err(|err| {
261            Box::new(ConfigError(format!(
262                "glob match failed for {pattern:?}: {err}"
263            ))) as Box<dyn std::error::Error + Send + Sync>
264        })?;
265        if path.is_file() {
266            files.push(path);
267        }
268    }
269    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
270    Ok(files)
271}
272
273fn collect_glob_files_multi(patterns: &[PathBuf]) -> FloeResult<Vec<PathBuf>> {
274    let mut files = Vec::new();
275    for pattern_path in patterns {
276        let pattern = pattern_path.to_string_lossy().to_string();
277        files.extend(collect_glob_files(&pattern)?);
278    }
279    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
280    files.dedup_by(|a, b| a.to_string_lossy() == b.to_string_lossy());
281    Ok(files)
282}
283
284fn no_match_message(
285    entity_name: &str,
286    storage: &str,
287    base_path: &str,
288    glob_used: &str,
289    recursive: bool,
290) -> String {
291    format!(
292        "entity.name={} source.storage={} no input files matched (base_path={}, glob={}, recursive={})",
293        entity_name, storage, base_path, glob_used, recursive
294    )
295}