floe_core/io/storage/providers/
local.rs1use std::path::{Path, PathBuf};
2
3use glob::glob;
4
5use crate::errors::{RunError, StorageError};
6use crate::{config, ConfigError, FloeResult};
7
8use crate::io::storage::{planner, ObjectRef, StorageClient};
9
10pub struct LocalClient;
11
12impl LocalClient {
13 pub fn new() -> Self {
14 Self
15 }
16}
17
18impl Default for LocalClient {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl StorageClient for LocalClient {
25 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
26 let path = Path::new(prefix);
27 if path.is_file() {
28 let uri = self.resolve_uri(prefix)?;
29 return Ok(vec![ObjectRef {
30 uri,
31 key: prefix.to_string(),
32 last_modified: None,
33 size: None,
34 }]);
35 }
36 if !path.exists() {
37 return Ok(Vec::new());
38 }
39 let mut refs = Vec::new();
40 for entry in std::fs::read_dir(path)? {
41 let entry = entry?;
42 let path = entry.path();
43 if path.is_file() {
44 let key = path.display().to_string();
45 let uri = self.resolve_uri(&key)?;
46 refs.push(ObjectRef {
47 uri,
48 key,
49 last_modified: None,
50 size: None,
51 });
52 }
53 }
54 refs = planner::stable_sort_refs(refs);
55 Ok(refs)
56 }
57
58 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
59 let src = PathBuf::from(uri.trim_start_matches("local://"));
60 let dest = temp_dir.join(
61 src.file_name()
62 .and_then(|name| name.to_str())
63 .unwrap_or("object"),
64 );
65 planner::ensure_parent_dir(&dest)?;
66 std::fs::copy(&src, &dest).map_err(|err| {
67 Box::new(StorageError(format!(
68 "local download failed from {}: {err}",
69 src.display()
70 ))) as Box<dyn std::error::Error + Send + Sync>
71 })?;
72 Ok(dest)
73 }
74
75 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
76 let dest = PathBuf::from(uri.trim_start_matches("local://"));
77 planner::ensure_parent_dir(&dest)?;
78 std::fs::copy(local_path, &dest).map_err(|err| {
79 Box::new(StorageError(format!(
80 "local upload failed to {}: {err}",
81 dest.display()
82 ))) as Box<dyn std::error::Error + Send + Sync>
83 })?;
84 Ok(())
85 }
86
87 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
88 let path = Path::new(path);
89 if path.is_absolute() {
90 Ok(format!("local://{}", path.display()))
91 } else {
92 let abs = std::env::current_dir()?.join(path);
93 Ok(format!("local://{}", abs.display()))
94 }
95 }
96
97 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
98 let src = Path::new(src_uri.trim_start_matches("local://"));
99 let dst = Path::new(dst_uri.trim_start_matches("local://"));
100 planner::ensure_parent_dir(dst)?;
101 std::fs::copy(src, dst).map_err(|err| {
102 Box::new(StorageError(format!(
103 "local copy failed from {} to {}: {err}",
104 src.display(),
105 dst.display()
106 ))) as Box<dyn std::error::Error + Send + Sync>
107 })?;
108 Ok(())
109 }
110
111 fn delete_object(&self, uri: &str) -> FloeResult<()> {
112 let path = Path::new(uri.trim_start_matches("local://"));
113 if path.exists() {
114 std::fs::remove_file(path).map_err(|err| {
115 Box::new(StorageError(format!(
116 "local delete failed for {}: {err}",
117 path.display()
118 ))) as Box<dyn std::error::Error + Send + Sync>
119 })?;
120 }
121 Ok(())
122 }
123
124 fn exists(&self, uri: &str) -> FloeResult<bool> {
125 let path = Path::new(uri.trim_start_matches("local://"));
126 Ok(path.exists())
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum LocalInputMode {
132 File,
133 Directory,
134}
135
136#[derive(Debug, Clone)]
137pub struct ResolvedLocalInputs {
138 pub files: Vec<PathBuf>,
139 pub mode: LocalInputMode,
140}
141
142pub fn resolve_local_inputs(
143 config_dir: &Path,
144 entity_name: &str,
145 source: &config::SourceConfig,
146 storage: &str,
147 default_globs: &[String],
148) -> FloeResult<ResolvedLocalInputs> {
149 let default_options = config::SourceOptions::default();
150 let options = source.options.as_ref().unwrap_or(&default_options);
151 let recursive = options.recursive.unwrap_or(false);
152 let glob_override = options.glob.as_deref();
153 let raw_path = source.path.as_str();
154
155 if is_glob_pattern(raw_path) {
156 let pattern_path = resolve_glob_pattern(config_dir, raw_path);
157 let pattern = pattern_path.to_string_lossy().to_string();
158 let files = collect_glob_files(&pattern)?;
159 if files.is_empty() {
160 let (base_path, glob_used) = split_glob_details(&pattern_path, raw_path);
161 return Err(Box::new(RunError(no_match_message(
162 entity_name,
163 storage,
164 &base_path,
165 &glob_used,
166 recursive,
167 ))));
168 }
169 return Ok(ResolvedLocalInputs {
170 files,
171 mode: LocalInputMode::Directory,
172 });
173 }
174
175 let base_path = config::resolve_local_path(config_dir, raw_path);
176 if base_path.is_file() {
177 return Ok(ResolvedLocalInputs {
178 files: vec![base_path],
179 mode: LocalInputMode::File,
180 });
181 }
182
183 let glob_used = if let Some(glob_override) = glob_override {
184 vec![glob_override.to_string()]
185 } else {
186 default_globs.to_vec()
187 };
188 if !base_path.is_dir() {
189 return Err(Box::new(RunError(no_match_message(
190 entity_name,
191 storage,
192 &base_path.display().to_string(),
193 &glob_used.join(","),
194 recursive,
195 ))));
196 }
197
198 let pattern_paths = if recursive {
199 glob_used
200 .iter()
201 .map(|glob| base_path.join("**").join(glob))
202 .collect::<Vec<_>>()
203 } else {
204 glob_used
205 .iter()
206 .map(|glob| base_path.join(glob))
207 .collect::<Vec<_>>()
208 };
209 let files = collect_glob_files_multi(&pattern_paths)?;
210 if files.is_empty() {
211 return Err(Box::new(RunError(no_match_message(
212 entity_name,
213 storage,
214 &base_path.display().to_string(),
215 &glob_used.join(","),
216 recursive,
217 ))));
218 }
219
220 Ok(ResolvedLocalInputs {
221 files,
222 mode: LocalInputMode::Directory,
223 })
224}
225
226fn is_glob_pattern(value: &str) -> bool {
227 value.contains('*') || value.contains('?') || value.contains('[')
228}
229
230fn resolve_glob_pattern(config_dir: &Path, raw_path: &str) -> PathBuf {
231 let path = Path::new(raw_path);
232 if path.is_absolute() {
233 path.to_path_buf()
234 } else {
235 config_dir.join(raw_path)
236 }
237}
238
239fn split_glob_details(pattern_path: &Path, raw_pattern: &str) -> (String, String) {
240 let base = pattern_path
241 .parent()
242 .unwrap_or(pattern_path)
243 .display()
244 .to_string();
245 let glob_used = pattern_path
246 .file_name()
247 .map(|name| name.to_string_lossy().to_string())
248 .unwrap_or_else(|| raw_pattern.to_string());
249 (base, glob_used)
250}
251
252fn collect_glob_files(pattern: &str) -> FloeResult<Vec<PathBuf>> {
253 let mut files = Vec::new();
254 let entries = glob(pattern).map_err(|err| {
255 Box::new(ConfigError(format!(
256 "invalid glob pattern {pattern:?}: {err}"
257 ))) as Box<dyn std::error::Error + Send + Sync>
258 })?;
259 for entry in entries {
260 let path = entry.map_err(|err| {
261 Box::new(ConfigError(format!(
262 "glob match failed for {pattern:?}: {err}"
263 ))) as Box<dyn std::error::Error + Send + Sync>
264 })?;
265 if path.is_file() {
266 files.push(path);
267 }
268 }
269 files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
270 Ok(files)
271}
272
273fn collect_glob_files_multi(patterns: &[PathBuf]) -> FloeResult<Vec<PathBuf>> {
274 let mut files = Vec::new();
275 for pattern_path in patterns {
276 let pattern = pattern_path.to_string_lossy().to_string();
277 files.extend(collect_glob_files(&pattern)?);
278 }
279 files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
280 files.dedup_by(|a, b| a.to_string_lossy() == b.to_string_lossy());
281 Ok(files)
282}
283
284fn no_match_message(
285 entity_name: &str,
286 storage: &str,
287 base_path: &str,
288 glob_used: &str,
289 recursive: bool,
290) -> String {
291 format!(
292 "entity.name={} source.storage={} no input files matched (base_path={}, glob={}, recursive={})",
293 entity_name, storage, base_path, glob_used, recursive
294 )
295}