floe_core/io/storage/providers/
local.rs1use std::path::{Path, PathBuf};
2
3use glob::glob;
4
5use crate::errors::{RunError, StorageError};
6use crate::{config, ConfigError, FloeResult};
7
8use crate::io::storage::{planner, ConditionalWrite, ObjectRef, StorageClient, StoredObject};
9
10pub struct LocalClient;
11
12impl LocalClient {
13 pub fn new() -> Self {
14 Self
15 }
16}
17
18impl Default for LocalClient {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl StorageClient for LocalClient {
25 fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
26 let path = Path::new(prefix);
27 if path.is_file() {
28 let uri = self.resolve_uri(prefix)?;
29 return Ok(vec![ObjectRef {
30 uri,
31 key: prefix.to_string(),
32 last_modified: None,
33 size: None,
34 }]);
35 }
36 if !path.exists() {
37 return Ok(Vec::new());
38 }
39 let mut refs = Vec::new();
40 for entry in std::fs::read_dir(path)? {
41 let entry = entry?;
42 let path = entry.path();
43 if path.is_file() {
44 let key = path.display().to_string();
45 let uri = self.resolve_uri(&key)?;
46 refs.push(ObjectRef {
47 uri,
48 key,
49 last_modified: None,
50 size: None,
51 });
52 }
53 }
54 refs = planner::stable_sort_refs(refs);
55 Ok(refs)
56 }
57
58 fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
59 let src = PathBuf::from(uri.trim_start_matches("local://"));
60 let dest = temp_dir.join(
61 src.file_name()
62 .and_then(|name| name.to_str())
63 .unwrap_or("object"),
64 );
65 planner::ensure_parent_dir(&dest)?;
66 std::fs::copy(&src, &dest).map_err(|err| {
67 Box::new(StorageError(format!(
68 "local download failed from {}: {err}",
69 src.display()
70 ))) as Box<dyn std::error::Error + Send + Sync>
71 })?;
72 Ok(dest)
73 }
74
75 fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
76 let dest = PathBuf::from(uri.trim_start_matches("local://"));
77 planner::ensure_parent_dir(&dest)?;
78 std::fs::copy(local_path, &dest).map_err(|err| {
79 Box::new(StorageError(format!(
80 "local upload failed to {}: {err}",
81 dest.display()
82 ))) as Box<dyn std::error::Error + Send + Sync>
83 })?;
84 Ok(())
85 }
86
87 fn resolve_uri(&self, path: &str) -> FloeResult<String> {
88 let path = Path::new(path);
89 let normalized = if path.is_absolute() {
90 crate::io::storage::paths::normalize_local_path(path)
91 } else {
92 let abs = std::env::current_dir()?.join(path);
93 crate::io::storage::paths::normalize_local_path(&abs)
94 };
95 Ok(format!("local://{}", normalized.display()))
96 }
97
98 fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
99 let src = Path::new(src_uri.trim_start_matches("local://"));
100 let dst = Path::new(dst_uri.trim_start_matches("local://"));
101 planner::ensure_parent_dir(dst)?;
102 std::fs::copy(src, dst).map_err(|err| {
103 Box::new(StorageError(format!(
104 "local copy failed from {} to {}: {err}",
105 src.display(),
106 dst.display()
107 ))) as Box<dyn std::error::Error + Send + Sync>
108 })?;
109 Ok(())
110 }
111
112 fn delete_object(&self, uri: &str) -> FloeResult<()> {
113 let path = Path::new(uri.trim_start_matches("local://"));
114 if path.exists() {
115 std::fs::remove_file(path).map_err(|err| {
116 Box::new(StorageError(format!(
117 "local delete failed for {}: {err}",
118 path.display()
119 ))) as Box<dyn std::error::Error + Send + Sync>
120 })?;
121 }
122 Ok(())
123 }
124
125 fn exists(&self, uri: &str) -> FloeResult<bool> {
126 let path = Path::new(uri.trim_start_matches("local://"));
127 Ok(path.exists())
128 }
129
130 fn read_object(&self, uri: &str) -> FloeResult<Option<StoredObject>> {
131 let path = Path::new(uri.trim_start_matches("local://"));
132 if !path.exists() {
133 return Ok(None);
134 }
135 let _lock = FileLock::acquire(path)?;
136 if !path.exists() {
137 return Ok(None);
138 }
139 Ok(Some(StoredObject {
140 body: std::fs::read(path)?,
141 version: local_version(path)?,
142 }))
143 }
144
145 fn write_object_conditional(
146 &self,
147 uri: &str,
148 expected_version: Option<&str>,
149 body: &[u8],
150 ) -> FloeResult<ConditionalWrite> {
151 let path = PathBuf::from(uri.trim_start_matches("local://"));
152 planner::ensure_parent_dir(&path)?;
153 let _lock = FileLock::acquire(&path)?;
154 let current = if path.exists() {
155 Some(local_version(&path)?)
156 } else {
157 None
158 };
159 if current.as_deref() != expected_version {
160 return Ok(ConditionalWrite::Conflict);
161 }
162 std::fs::write(&path, body)?;
163 Ok(ConditionalWrite::Written {
164 version: local_version(&path)?,
165 })
166 }
167
168 fn delete_object_conditional(
169 &self,
170 uri: &str,
171 expected_version: Option<&str>,
172 ) -> FloeResult<ConditionalWrite> {
173 let path = PathBuf::from(uri.trim_start_matches("local://"));
174 planner::ensure_parent_dir(&path)?;
175 let _lock = FileLock::acquire(&path)?;
176 let current = if path.exists() {
177 Some(local_version(&path)?)
178 } else {
179 None
180 };
181 if current.as_deref() != expected_version {
182 return Ok(ConditionalWrite::Conflict);
183 }
184 if path.exists() {
185 std::fs::remove_file(&path)?;
186 }
187 Ok(ConditionalWrite::Written {
188 version: "deleted".to_string(),
189 })
190 }
191}
192
193struct FileLock {
194 path: PathBuf,
195}
196
197impl FileLock {
198 fn acquire(base: &Path) -> FloeResult<Self> {
199 let lock_path = PathBuf::from(format!("{}.lock", base.display()));
200 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
201 loop {
202 match std::fs::OpenOptions::new()
203 .write(true)
204 .create_new(true)
205 .open(&lock_path)
206 {
207 Ok(_) => return Ok(Self { path: lock_path }),
208 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
209 if std::time::Instant::now() >= deadline {
210 return Err(format!(
211 "timed out acquiring local state lock {}",
212 lock_path.display()
213 )
214 .into());
215 }
216 std::thread::sleep(std::time::Duration::from_millis(10));
217 }
218 Err(e) => return Err(Box::new(e)),
219 }
220 }
221 }
222}
223
224impl Drop for FileLock {
225 fn drop(&mut self) {
226 let _ = std::fs::remove_file(&self.path);
227 }
228}
229
230fn local_version(path: &Path) -> FloeResult<String> {
231 let metadata = std::fs::metadata(path)?;
232 let modified = metadata
233 .modified()?
234 .duration_since(std::time::UNIX_EPOCH)
235 .unwrap_or_default()
236 .as_nanos();
237 Ok(format!("{}:{modified}", metadata.len()))
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum LocalInputMode {
242 File,
243 Directory,
244}
245
246#[derive(Debug, Clone)]
247pub struct ResolvedLocalInputs {
248 pub files: Vec<PathBuf>,
249 pub mode: LocalInputMode,
250}
251
252pub fn resolve_local_inputs(
253 config_dir: &Path,
254 entity_name: &str,
255 source: &config::SourceConfig,
256 storage: &str,
257 default_globs: &[String],
258) -> FloeResult<ResolvedLocalInputs> {
259 let default_options = config::SourceOptions::default();
260 let options = source.options.as_ref().unwrap_or(&default_options);
261 let recursive = options.recursive.unwrap_or(false);
262 let glob_override = options.glob.as_deref();
263 let raw_path = source.path.as_str();
264
265 if is_glob_pattern(raw_path) {
266 let pattern_path = resolve_glob_pattern(config_dir, raw_path);
267 let pattern = pattern_path.to_string_lossy().to_string();
268 let files = collect_glob_files(&pattern)?;
269 if files.is_empty() {
270 let (base_path, glob_used) = split_glob_details(&pattern_path, raw_path);
271 return Err(Box::new(RunError(no_match_message(
272 entity_name,
273 storage,
274 &base_path,
275 &glob_used,
276 recursive,
277 ))));
278 }
279 return Ok(ResolvedLocalInputs {
280 files,
281 mode: LocalInputMode::Directory,
282 });
283 }
284
285 let base_path = config::resolve_local_path(config_dir, raw_path);
286 if base_path.is_file() {
287 return Ok(ResolvedLocalInputs {
288 files: vec![base_path],
289 mode: LocalInputMode::File,
290 });
291 }
292
293 let glob_used = if let Some(glob_override) = glob_override {
294 vec![glob_override.to_string()]
295 } else {
296 default_globs.to_vec()
297 };
298 if !base_path.is_dir() {
299 return Err(Box::new(RunError(no_match_message(
300 entity_name,
301 storage,
302 &base_path.display().to_string(),
303 &glob_used.join(","),
304 recursive,
305 ))));
306 }
307
308 let pattern_paths = if recursive {
309 glob_used
310 .iter()
311 .map(|glob| base_path.join("**").join(glob))
312 .collect::<Vec<_>>()
313 } else {
314 glob_used
315 .iter()
316 .map(|glob| base_path.join(glob))
317 .collect::<Vec<_>>()
318 };
319 let files = collect_glob_files_multi(&pattern_paths)?;
320 if files.is_empty() {
321 return Err(Box::new(RunError(no_match_message(
322 entity_name,
323 storage,
324 &base_path.display().to_string(),
325 &glob_used.join(","),
326 recursive,
327 ))));
328 }
329
330 Ok(ResolvedLocalInputs {
331 files,
332 mode: LocalInputMode::Directory,
333 })
334}
335
336fn is_glob_pattern(value: &str) -> bool {
337 value.contains('*') || value.contains('?') || value.contains('[')
338}
339
340fn resolve_glob_pattern(config_dir: &Path, raw_path: &str) -> PathBuf {
341 let path = Path::new(raw_path);
342 if path.is_absolute() {
343 path.to_path_buf()
344 } else {
345 config_dir.join(raw_path)
346 }
347}
348
349fn split_glob_details(pattern_path: &Path, raw_pattern: &str) -> (String, String) {
350 let base = pattern_path
351 .parent()
352 .unwrap_or(pattern_path)
353 .display()
354 .to_string();
355 let glob_used = pattern_path
356 .file_name()
357 .map(|name| name.to_string_lossy().to_string())
358 .unwrap_or_else(|| raw_pattern.to_string());
359 (base, glob_used)
360}
361
362fn collect_glob_files(pattern: &str) -> FloeResult<Vec<PathBuf>> {
363 let mut files = Vec::new();
364 let entries = glob(pattern).map_err(|err| {
365 Box::new(ConfigError(format!(
366 "invalid glob pattern {pattern:?}: {err}"
367 ))) as Box<dyn std::error::Error + Send + Sync>
368 })?;
369 for entry in entries {
370 let path = entry.map_err(|err| {
371 Box::new(ConfigError(format!(
372 "glob match failed for {pattern:?}: {err}"
373 ))) as Box<dyn std::error::Error + Send + Sync>
374 })?;
375 if path.is_file() {
376 files.push(crate::io::storage::paths::normalize_local_path(&path));
377 }
378 }
379 files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
380 Ok(files)
381}
382
383fn collect_glob_files_multi(patterns: &[PathBuf]) -> FloeResult<Vec<PathBuf>> {
384 let mut files = Vec::new();
385 for pattern_path in patterns {
386 let pattern = pattern_path.to_string_lossy().to_string();
387 files.extend(collect_glob_files(&pattern)?);
388 }
389 files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
390 files.dedup_by(|a, b| a.to_string_lossy() == b.to_string_lossy());
391 Ok(files)
392}
393
394fn no_match_message(
395 entity_name: &str,
396 storage: &str,
397 base_path: &str,
398 glob_used: &str,
399 recursive: bool,
400) -> String {
401 format!(
402 "entity.name={} source.storage={} no input files matched (base_path={}, glob={}, recursive={})",
403 entity_name, storage, base_path, glob_used, recursive
404 )
405}