Skip to main content

floe_core/io/storage/ops/
inputs.rs

1use std::path::{Path, PathBuf};
2use std::time::SystemTime;
3
4use glob::{MatchOptions, Pattern};
5
6use crate::errors::RunError;
7use crate::io::storage::{planner, Target};
8use crate::{config, io, report, FloeResult};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ResolveInputsMode {
12    Download,
13    ListOnly,
14}
15
16#[derive(Debug, Clone)]
17pub struct ResolvedInputs {
18    pub files: Vec<io::format::InputFile>,
19    pub listed: Vec<String>,
20    pub mode: report::ResolvedInputMode,
21}
22
23pub fn resolve_inputs(
24    config_dir: &Path,
25    entity: &config::EntityConfig,
26    adapter: &dyn io::format::InputAdapter,
27    target: &Target,
28    resolution_mode: ResolveInputsMode,
29    temp_dir: Option<&Path>,
30    storage_client: Option<&dyn crate::io::storage::StorageClient>,
31) -> FloeResult<ResolvedInputs> {
32    // Storage-specific resolution: list + download for cloud, direct paths for local.
33    match target {
34        Target::S3 { storage, .. } => {
35            let client = require_storage_client(storage_client, "s3")?;
36            let (bucket, key) = target.s3_parts().ok_or_else(|| {
37                Box::new(crate::errors::RunError(
38                    "s3 target missing bucket".to_string(),
39                ))
40            })?;
41            let location = format!("bucket={}", bucket);
42            resolve_cloud_inputs_for_prefix(
43                client,
44                key,
45                adapter,
46                entity,
47                storage,
48                &location,
49                resolution_mode,
50                temp_dir,
51            )
52        }
53        Target::Gcs { storage, .. } => {
54            let client = require_storage_client(storage_client, "gcs")?;
55            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
56                Box::new(crate::errors::RunError(
57                    "gcs target missing bucket".to_string(),
58                ))
59            })?;
60            let location = format!("bucket={}", bucket);
61            resolve_cloud_inputs_for_prefix(
62                client,
63                key,
64                adapter,
65                entity,
66                storage,
67                &location,
68                resolution_mode,
69                temp_dir,
70            )
71        }
72        Target::Adls { storage, .. } => {
73            let client = require_storage_client(storage_client, "adls")?;
74            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
75                Box::new(crate::errors::RunError(
76                    "adls target missing container".to_string(),
77                ))
78            })?;
79            let location = format!("container={}, account={}", container, account);
80            resolve_cloud_inputs_for_prefix(
81                client,
82                base_path,
83                adapter,
84                entity,
85                storage,
86                &location,
87                resolution_mode,
88                temp_dir,
89            )
90        }
91        Target::Local { storage, .. } => {
92            let resolved =
93                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
94            let listed = build_local_listing(&resolved.files, storage_client);
95            // Download mode lists files for processing; ListOnly produces only URIs for dry-run.
96            let files = match resolution_mode {
97                ResolveInputsMode::Download => {
98                    build_local_inputs(&resolved.files, entity, storage_client)
99                }
100                ResolveInputsMode::ListOnly => Vec::new(),
101            };
102            let mode = match resolved.mode {
103                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
104                io::storage::local::LocalInputMode::Directory => {
105                    report::ResolvedInputMode::Directory
106                }
107            };
108            Ok(ResolvedInputs {
109                files,
110                listed,
111                mode,
112            })
113        }
114    }
115}
116
117fn resolve_cloud_inputs_for_prefix(
118    client: &dyn crate::io::storage::StorageClient,
119    prefix: &str,
120    adapter: &dyn io::format::InputAdapter,
121    entity: &config::EntityConfig,
122    storage: &str,
123    location: &str,
124    resolution_mode: ResolveInputsMode,
125    _temp_dir: Option<&Path>,
126) -> FloeResult<ResolvedInputs> {
127    let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
128    let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
129    // Download mode now lists metadata only; actual downloads happen JIT in the entity run.
130    let files = match resolution_mode {
131        ResolveInputsMode::Download => list_cloud_inputs(&objects, entity),
132        ResolveInputsMode::ListOnly => Vec::new(),
133    };
134    Ok(ResolvedInputs {
135        files,
136        listed,
137        mode: report::ResolvedInputMode::Directory,
138    })
139}
140
141fn require_storage_client<'a>(
142    storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
143    label: &str,
144) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
145    storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
146        Box::new(crate::errors::RunError(format!(
147            "{} storage client missing",
148            label
149        )))
150    })
151}
152
153fn list_cloud_objects(
154    client: &dyn crate::io::storage::StorageClient,
155    source_path: &str,
156    adapter: &dyn io::format::InputAdapter,
157    entity: &config::EntityConfig,
158    storage: &str,
159    location: &str,
160) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
161    let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
162        Box::new(crate::errors::RunError(format!(
163            "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
164            entity.name, storage, location, source_path, err
165        ))) as Box<dyn std::error::Error + Send + Sync>
166    })?;
167    let suffixes = adapter.suffixes()?;
168    let list_refs = client.list(source_match.list_prefix())?;
169    let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
170    if filtered.is_empty() {
171        let match_desc = source_match.match_description();
172        return Err(Box::new(crate::errors::RunError(format!(
173            "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
174            entity.name,
175            storage,
176            location,
177            source_match.list_prefix(),
178            match_desc,
179            suffixes.join(",")
180        ))));
181    }
182    Ok(filtered)
183}
184
185/// Builds `InputFile` metadata from cloud object refs without downloading.
186/// Downloads happen later, JIT per file, via [`localize_input`].
187fn list_cloud_inputs(
188    objects: &[io::storage::planner::ObjectRef],
189    entity: &config::EntityConfig,
190) -> Vec<io::format::InputFile> {
191    objects
192        .iter()
193        .map(|object| {
194            let source_name =
195                planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
196            let source_stem =
197                planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
198            io::format::InputFile {
199                source_uri: object.uri.clone(),
200                source_name,
201                source_stem,
202                source_size: object.size,
203                source_mtime: object.last_modified.clone(),
204            }
205        })
206        .collect()
207}
208
209fn build_local_inputs(
210    files: &[PathBuf],
211    entity: &config::EntityConfig,
212    storage_client: Option<&dyn crate::io::storage::StorageClient>,
213) -> Vec<io::format::InputFile> {
214    files
215        .iter()
216        .map(|path| {
217            let normalized_path = crate::io::storage::paths::normalize_local_path(path);
218            let metadata = std::fs::metadata(&normalized_path).ok();
219            let source_name = path
220                .file_name()
221                .and_then(|name| name.to_str())
222                .unwrap_or(entity.name.as_str())
223                .to_string();
224            let source_stem = Path::new(&source_name)
225                .file_stem()
226                .and_then(|stem| stem.to_str())
227                .unwrap_or(entity.name.as_str())
228                .to_string();
229            let uri = storage_client
230                .and_then(|client| {
231                    client
232                        .resolve_uri(&normalized_path.display().to_string())
233                        .ok()
234                })
235                .unwrap_or_else(|| normalized_path.display().to_string());
236            io::format::InputFile {
237                source_uri: uri,
238                source_name,
239                source_stem,
240                source_size: metadata.as_ref().map(|metadata| metadata.len()),
241                source_mtime: metadata
242                    .as_ref()
243                    .and_then(|metadata| metadata.modified().ok())
244                    .and_then(system_time_to_rfc3339),
245            }
246        })
247        .collect()
248}
249
250/// Resolves an `InputFile` to a `LocalInputFile` with a guaranteed local path.
251///
252/// For cloud URIs (s3://, gs://, abfs://) the file is downloaded to `temp_dir`
253/// and `is_ephemeral` is set to `true` so callers can delete it after use.
254/// For local URIs the source path is used directly with no copy.
255pub fn localize_input(
256    input_file: io::format::InputFile,
257    storage_client: Option<&dyn crate::io::storage::StorageClient>,
258    temp_dir: Option<&Path>,
259) -> FloeResult<io::format::LocalInputFile> {
260    if is_cloud_uri(&input_file.source_uri) {
261        let client = storage_client.ok_or_else(|| {
262            Box::new(RunError(format!(
263                "storage client required to download {}",
264                input_file.source_uri
265            ))) as Box<dyn std::error::Error + Send + Sync>
266        })?;
267        let temp = temp_dir.ok_or_else(|| {
268            Box::new(RunError(format!(
269                "temp_dir required to download {}",
270                input_file.source_uri
271            ))) as Box<dyn std::error::Error + Send + Sync>
272        })?;
273        let local_path = client.download_to_temp(&input_file.source_uri, temp)?;
274        Ok(io::format::LocalInputFile {
275            file: input_file,
276            local_path,
277            is_ephemeral: true,
278        })
279    } else {
280        // Local URI: strip the "local://" scheme if present; the rest is the path.
281        let path_str = input_file.source_uri.trim_start_matches("local://");
282        Ok(io::format::LocalInputFile {
283            local_path: PathBuf::from(path_str),
284            file: input_file,
285            is_ephemeral: false,
286        })
287    }
288}
289
290fn is_cloud_uri(uri: &str) -> bool {
291    uri.starts_with("s3://")
292        || uri.starts_with("gs://")
293        || uri.starts_with("abfs://")
294        || uri.starts_with("az://")
295        || uri.starts_with("gcs://")
296}
297
298fn system_time_to_rfc3339(value: SystemTime) -> Option<String> {
299    time::OffsetDateTime::from(value)
300        .format(&time::format_description::well_known::Rfc3339)
301        .ok()
302}
303
304fn build_local_listing(
305    files: &[PathBuf],
306    storage_client: Option<&dyn crate::io::storage::StorageClient>,
307) -> Vec<String> {
308    files
309        .iter()
310        .map(|path| {
311            let normalized_path = crate::io::storage::paths::normalize_local_path(path);
312            storage_client
313                .and_then(|client| {
314                    client
315                        .resolve_uri(&normalized_path.display().to_string())
316                        .ok()
317                })
318                .unwrap_or_else(|| normalized_path.display().to_string())
319        })
320        .collect()
321}
322
323#[derive(Debug)]
324struct CloudSourceMatch {
325    list_prefix: String,
326    glob_pattern: Option<String>,
327    matcher: Option<Pattern>,
328}
329
330impl CloudSourceMatch {
331    fn new(source_path: &str) -> FloeResult<Self> {
332        if !contains_glob_metachar(source_path) {
333            return Ok(Self {
334                list_prefix: source_path.to_string(),
335                glob_pattern: None,
336                matcher: None,
337            });
338        }
339
340        let list_prefix = prefix_before_first_glob(source_path);
341        if list_prefix.trim_matches('/').is_empty() {
342            return Err(Box::new(crate::errors::RunError(
343                "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
344                    .to_string(),
345            )));
346        }
347
348        let matcher = Pattern::new(source_path).map_err(|err| {
349            Box::new(crate::errors::RunError(format!(
350                "invalid cloud glob pattern {:?}: {err}",
351                source_path
352            ))) as Box<dyn std::error::Error + Send + Sync>
353        })?;
354
355        Ok(Self {
356            list_prefix: list_prefix.to_string(),
357            glob_pattern: Some(source_path.to_string()),
358            matcher: Some(matcher),
359        })
360    }
361
362    fn list_prefix(&self) -> &str {
363        self.list_prefix.as_str()
364    }
365
366    fn matches_key(&self, key: &str) -> bool {
367        match &self.matcher {
368            Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
369            None => true,
370        }
371    }
372
373    fn match_description(&self) -> String {
374        match &self.glob_pattern {
375            Some(pattern) => format!("glob={pattern}"),
376            None => "glob=<none>".to_string(),
377        }
378    }
379}
380
381fn filter_cloud_list_refs(
382    list_refs: Vec<io::storage::planner::ObjectRef>,
383    source_match: &CloudSourceMatch,
384    suffixes: &[String],
385) -> Vec<io::storage::planner::ObjectRef> {
386    let filtered = list_refs
387        .into_iter()
388        .filter(|obj| source_match.matches_key(&obj.key))
389        .collect::<Vec<_>>();
390    let filtered = planner::filter_by_suffixes(filtered, suffixes);
391    planner::stable_sort_refs(filtered)
392}
393
394fn contains_glob_metachar(value: &str) -> bool {
395    first_glob_metachar_index(value).is_some()
396}
397
398fn prefix_before_first_glob(value: &str) -> &str {
399    match first_glob_metachar_index(value) {
400        Some(index) => &value[..index],
401        None => value,
402    }
403}
404
405fn first_glob_metachar_index(value: &str) -> Option<usize> {
406    let mut escaped = false;
407    for (idx, ch) in value.char_indices() {
408        if escaped {
409            escaped = false;
410            continue;
411        }
412        if ch == '\\' {
413            escaped = true;
414            continue;
415        }
416        if matches!(ch, '*' | '?') {
417            return Some(idx);
418        }
419    }
420    None
421}
422
423fn cloud_glob_match_options() -> MatchOptions {
424    MatchOptions {
425        case_sensitive: true,
426        require_literal_separator: true,
427        require_literal_leading_dot: false,
428    }
429}