Skip to main content

floe_core/io/storage/
local.rs

1use std::path::{Path, PathBuf};
2
3use glob::glob;
4
5use crate::errors::{RunError, StorageError};
6use crate::{config, ConfigError, FloeResult};
7
8use super::{planner, ObjectRef, StorageClient};
9
10pub struct LocalClient;
11
12impl LocalClient {
13    pub fn new() -> Self {
14        Self
15    }
16}
17
18impl Default for LocalClient {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl StorageClient for LocalClient {
25    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
26        let path = Path::new(prefix);
27        if path.is_file() {
28            let uri = self.resolve_uri(prefix)?;
29            return Ok(vec![ObjectRef {
30                uri,
31                key: prefix.to_string(),
32                last_modified: None,
33                size: None,
34            }]);
35        }
36        if !path.exists() {
37            return Ok(Vec::new());
38        }
39        let mut refs = Vec::new();
40        for entry in std::fs::read_dir(path)? {
41            let entry = entry?;
42            let path = entry.path();
43            if path.is_file() {
44                let key = path.display().to_string();
45                let uri = self.resolve_uri(&key)?;
46                refs.push(ObjectRef {
47                    uri,
48                    key,
49                    last_modified: None,
50                    size: None,
51                });
52            }
53        }
54        refs = planner::stable_sort_refs(refs);
55        Ok(refs)
56    }
57
58    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
59        let src = PathBuf::from(uri.trim_start_matches("local://"));
60        let dest = temp_dir.join(
61            src.file_name()
62                .and_then(|name| name.to_str())
63                .unwrap_or("object"),
64        );
65        planner::ensure_parent_dir(&dest)?;
66        std::fs::copy(&src, &dest).map_err(|err| {
67            Box::new(StorageError(format!(
68                "local download failed from {}: {err}",
69                src.display()
70            ))) as Box<dyn std::error::Error + Send + Sync>
71        })?;
72        Ok(dest)
73    }
74
75    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
76        let dest = PathBuf::from(uri.trim_start_matches("local://"));
77        planner::ensure_parent_dir(&dest)?;
78        std::fs::copy(local_path, &dest).map_err(|err| {
79            Box::new(StorageError(format!(
80                "local upload failed to {}: {err}",
81                dest.display()
82            ))) as Box<dyn std::error::Error + Send + Sync>
83        })?;
84        Ok(())
85    }
86
87    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
88        let path = Path::new(path);
89        if path.is_absolute() {
90            Ok(format!("local://{}", path.display()))
91        } else {
92            let abs = std::env::current_dir()?.join(path);
93            Ok(format!("local://{}", abs.display()))
94        }
95    }
96
97    fn delete(&self, uri: &str) -> FloeResult<()> {
98        let path = Path::new(uri.trim_start_matches("local://"));
99        if path.exists() {
100            std::fs::remove_file(path).map_err(|err| {
101                Box::new(StorageError(format!(
102                    "local delete failed for {}: {err}",
103                    path.display()
104                ))) as Box<dyn std::error::Error + Send + Sync>
105            })?;
106        }
107        Ok(())
108    }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum LocalInputMode {
113    File,
114    Directory,
115}
116
117#[derive(Debug, Clone)]
118pub struct ResolvedLocalInputs {
119    pub files: Vec<PathBuf>,
120    pub mode: LocalInputMode,
121}
122
123pub fn resolve_local_inputs(
124    config_dir: &Path,
125    entity_name: &str,
126    source: &config::SourceConfig,
127    storage: &str,
128    default_globs: &[String],
129) -> FloeResult<ResolvedLocalInputs> {
130    let default_options = config::SourceOptions::default();
131    let options = source.options.as_ref().unwrap_or(&default_options);
132    let recursive = options.recursive.unwrap_or(false);
133    let glob_override = options.glob.as_deref();
134    let raw_path = source.path.as_str();
135
136    if is_glob_pattern(raw_path) {
137        let pattern_path = resolve_glob_pattern(config_dir, raw_path);
138        let pattern = pattern_path.to_string_lossy().to_string();
139        let files = collect_glob_files(&pattern)?;
140        if files.is_empty() {
141            let (base_path, glob_used) = split_glob_details(&pattern_path, raw_path);
142            return Err(Box::new(RunError(no_match_message(
143                entity_name,
144                storage,
145                &base_path,
146                &glob_used,
147                recursive,
148            ))));
149        }
150        return Ok(ResolvedLocalInputs {
151            files,
152            mode: LocalInputMode::Directory,
153        });
154    }
155
156    let base_path = config::resolve_local_path(config_dir, raw_path);
157    if base_path.is_file() {
158        return Ok(ResolvedLocalInputs {
159            files: vec![base_path],
160            mode: LocalInputMode::File,
161        });
162    }
163
164    let glob_used = if let Some(glob_override) = glob_override {
165        vec![glob_override.to_string()]
166    } else {
167        default_globs.to_vec()
168    };
169    if !base_path.is_dir() {
170        return Err(Box::new(RunError(no_match_message(
171            entity_name,
172            storage,
173            &base_path.display().to_string(),
174            &glob_used.join(","),
175            recursive,
176        ))));
177    }
178
179    let pattern_paths = if recursive {
180        glob_used
181            .iter()
182            .map(|glob| base_path.join("**").join(glob))
183            .collect::<Vec<_>>()
184    } else {
185        glob_used
186            .iter()
187            .map(|glob| base_path.join(glob))
188            .collect::<Vec<_>>()
189    };
190    let files = collect_glob_files_multi(&pattern_paths)?;
191    if files.is_empty() {
192        return Err(Box::new(RunError(no_match_message(
193            entity_name,
194            storage,
195            &base_path.display().to_string(),
196            &glob_used.join(","),
197            recursive,
198        ))));
199    }
200
201    Ok(ResolvedLocalInputs {
202        files,
203        mode: LocalInputMode::Directory,
204    })
205}
206
207fn is_glob_pattern(value: &str) -> bool {
208    value.contains('*') || value.contains('?') || value.contains('[')
209}
210
211fn resolve_glob_pattern(config_dir: &Path, raw_path: &str) -> PathBuf {
212    let path = Path::new(raw_path);
213    if path.is_absolute() {
214        path.to_path_buf()
215    } else {
216        config_dir.join(raw_path)
217    }
218}
219
220fn split_glob_details(pattern_path: &Path, raw_pattern: &str) -> (String, String) {
221    let base = pattern_path
222        .parent()
223        .unwrap_or(pattern_path)
224        .display()
225        .to_string();
226    let glob_used = pattern_path
227        .file_name()
228        .map(|name| name.to_string_lossy().to_string())
229        .unwrap_or_else(|| raw_pattern.to_string());
230    (base, glob_used)
231}
232
233fn collect_glob_files(pattern: &str) -> FloeResult<Vec<PathBuf>> {
234    let mut files = Vec::new();
235    let entries = glob(pattern).map_err(|err| {
236        Box::new(ConfigError(format!(
237            "invalid glob pattern {pattern:?}: {err}"
238        ))) as Box<dyn std::error::Error + Send + Sync>
239    })?;
240    for entry in entries {
241        let path = entry.map_err(|err| {
242            Box::new(ConfigError(format!(
243                "glob match failed for {pattern:?}: {err}"
244            ))) as Box<dyn std::error::Error + Send + Sync>
245        })?;
246        if path.is_file() {
247            files.push(path);
248        }
249    }
250    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
251    Ok(files)
252}
253
254fn collect_glob_files_multi(patterns: &[PathBuf]) -> FloeResult<Vec<PathBuf>> {
255    let mut files = Vec::new();
256    for pattern_path in patterns {
257        let pattern = pattern_path.to_string_lossy().to_string();
258        files.extend(collect_glob_files(&pattern)?);
259    }
260    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
261    files.dedup_by(|a, b| a.to_string_lossy() == b.to_string_lossy());
262    Ok(files)
263}
264
265fn no_match_message(
266    entity_name: &str,
267    storage: &str,
268    base_path: &str,
269    glob_used: &str,
270    recursive: bool,
271) -> String {
272    format!(
273        "entity.name={} source.storage={} no input files matched (base_path={}, glob={}, recursive={})",
274        entity_name, storage, base_path, glob_used, recursive
275    )
276}