Skip to main content

floe_core/io/storage/ops/
inputs.rs

1use std::path::{Path, PathBuf};
2
3use glob::{MatchOptions, Pattern};
4
5use crate::io::storage::{planner, Target};
6use crate::{config, io, report, FloeResult};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ResolveInputsMode {
10    Download,
11    ListOnly,
12}
13
14#[derive(Debug, Clone)]
15pub struct ResolvedInputs {
16    pub files: Vec<io::format::InputFile>,
17    pub listed: Vec<String>,
18    pub mode: report::ResolvedInputMode,
19}
20
21pub fn resolve_inputs(
22    config_dir: &Path,
23    entity: &config::EntityConfig,
24    adapter: &dyn io::format::InputAdapter,
25    target: &Target,
26    resolution_mode: ResolveInputsMode,
27    temp_dir: Option<&Path>,
28    storage_client: Option<&dyn crate::io::storage::StorageClient>,
29) -> FloeResult<ResolvedInputs> {
30    // Storage-specific resolution: list + download for cloud, direct paths for local.
31    match target {
32        Target::S3 { storage, .. } => {
33            let client = require_storage_client(storage_client, "s3")?;
34            let (bucket, key) = target.s3_parts().ok_or_else(|| {
35                Box::new(crate::errors::RunError(
36                    "s3 target missing bucket".to_string(),
37                ))
38            })?;
39            let location = format!("bucket={}", bucket);
40            resolve_cloud_inputs_for_prefix(
41                client,
42                key,
43                adapter,
44                entity,
45                storage,
46                &location,
47                resolution_mode,
48                temp_dir,
49            )
50        }
51        Target::Gcs { storage, .. } => {
52            let client = require_storage_client(storage_client, "gcs")?;
53            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
54                Box::new(crate::errors::RunError(
55                    "gcs target missing bucket".to_string(),
56                ))
57            })?;
58            let location = format!("bucket={}", bucket);
59            resolve_cloud_inputs_for_prefix(
60                client,
61                key,
62                adapter,
63                entity,
64                storage,
65                &location,
66                resolution_mode,
67                temp_dir,
68            )
69        }
70        Target::Adls { storage, .. } => {
71            let client = require_storage_client(storage_client, "adls")?;
72            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
73                Box::new(crate::errors::RunError(
74                    "adls target missing container".to_string(),
75                ))
76            })?;
77            let location = format!("container={}, account={}", container, account);
78            resolve_cloud_inputs_for_prefix(
79                client,
80                base_path,
81                adapter,
82                entity,
83                storage,
84                &location,
85                resolution_mode,
86                temp_dir,
87            )
88        }
89        Target::Local { storage, .. } => {
90            let resolved =
91                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
92            let listed = build_local_listing(&resolved.files, storage_client);
93            let files = match resolution_mode {
94                ResolveInputsMode::Download => {
95                    build_local_inputs(&resolved.files, entity, storage_client)
96                }
97                ResolveInputsMode::ListOnly => Vec::new(),
98            };
99            let mode = match resolved.mode {
100                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
101                io::storage::local::LocalInputMode::Directory => {
102                    report::ResolvedInputMode::Directory
103                }
104            };
105            Ok(ResolvedInputs {
106                files,
107                listed,
108                mode,
109            })
110        }
111    }
112}
113
114fn resolve_cloud_inputs_for_prefix(
115    client: &dyn crate::io::storage::StorageClient,
116    prefix: &str,
117    adapter: &dyn io::format::InputAdapter,
118    entity: &config::EntityConfig,
119    storage: &str,
120    location: &str,
121    resolution_mode: ResolveInputsMode,
122    temp_dir: Option<&Path>,
123) -> FloeResult<ResolvedInputs> {
124    let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
125    let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
126    let files = match resolution_mode {
127        ResolveInputsMode::Download => {
128            let temp_dir = require_temp_dir(temp_dir, storage)?;
129            build_cloud_inputs(client, &objects, temp_dir, entity)?
130        }
131        ResolveInputsMode::ListOnly => Vec::new(),
132    };
133    Ok(ResolvedInputs {
134        files,
135        listed,
136        mode: report::ResolvedInputMode::Directory,
137    })
138}
139
140fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
141    temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
142        Box::new(crate::errors::RunError(format!(
143            "{} tempdir missing",
144            label
145        )))
146    })
147}
148
149fn require_storage_client<'a>(
150    storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
151    label: &str,
152) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
153    storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
154        Box::new(crate::errors::RunError(format!(
155            "{} storage client missing",
156            label
157        )))
158    })
159}
160
161fn list_cloud_objects(
162    client: &dyn crate::io::storage::StorageClient,
163    source_path: &str,
164    adapter: &dyn io::format::InputAdapter,
165    entity: &config::EntityConfig,
166    storage: &str,
167    location: &str,
168) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
169    let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
170        Box::new(crate::errors::RunError(format!(
171            "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
172            entity.name, storage, location, source_path, err
173        ))) as Box<dyn std::error::Error + Send + Sync>
174    })?;
175    let suffixes = adapter.suffixes()?;
176    let list_refs = client.list(source_match.list_prefix())?;
177    let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
178    if filtered.is_empty() {
179        let match_desc = source_match.match_description();
180        return Err(Box::new(crate::errors::RunError(format!(
181            "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
182            entity.name,
183            storage,
184            location,
185            source_match.list_prefix(),
186            match_desc,
187            suffixes.join(",")
188        ))));
189    }
190    Ok(filtered)
191}
192
193fn build_cloud_inputs(
194    client: &dyn crate::io::storage::StorageClient,
195    objects: &[io::storage::planner::ObjectRef],
196    temp_dir: &Path,
197    entity: &config::EntityConfig,
198) -> FloeResult<Vec<io::format::InputFile>> {
199    let mut inputs = Vec::with_capacity(objects.len());
200    for object in objects {
201        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
202        let source_name =
203            planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
204        let source_stem =
205            planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
206        let source_uri = object.uri.clone();
207        inputs.push(io::format::InputFile {
208            source_uri,
209            source_local_path: local_path,
210            source_name,
211            source_stem,
212        });
213    }
214    Ok(inputs)
215}
216
217fn build_local_inputs(
218    files: &[PathBuf],
219    entity: &config::EntityConfig,
220    storage_client: Option<&dyn crate::io::storage::StorageClient>,
221) -> Vec<io::format::InputFile> {
222    files
223        .iter()
224        .map(|path| {
225            let normalized_path = crate::io::storage::paths::normalize_local_path(path);
226            let source_name = path
227                .file_name()
228                .and_then(|name| name.to_str())
229                .unwrap_or(entity.name.as_str())
230                .to_string();
231            let source_stem = Path::new(&source_name)
232                .file_stem()
233                .and_then(|stem| stem.to_str())
234                .unwrap_or(entity.name.as_str())
235                .to_string();
236            let uri = storage_client
237                .and_then(|client| {
238                    client
239                        .resolve_uri(&normalized_path.display().to_string())
240                        .ok()
241                })
242                .unwrap_or_else(|| normalized_path.display().to_string());
243            io::format::InputFile {
244                source_uri: uri,
245                source_local_path: normalized_path,
246                source_name,
247                source_stem,
248            }
249        })
250        .collect()
251}
252
253fn build_local_listing(
254    files: &[PathBuf],
255    storage_client: Option<&dyn crate::io::storage::StorageClient>,
256) -> Vec<String> {
257    files
258        .iter()
259        .map(|path| {
260            let normalized_path = crate::io::storage::paths::normalize_local_path(path);
261            storage_client
262                .and_then(|client| {
263                    client
264                        .resolve_uri(&normalized_path.display().to_string())
265                        .ok()
266                })
267                .unwrap_or_else(|| normalized_path.display().to_string())
268        })
269        .collect()
270}
271
272#[derive(Debug)]
273struct CloudSourceMatch {
274    list_prefix: String,
275    glob_pattern: Option<String>,
276    matcher: Option<Pattern>,
277}
278
279impl CloudSourceMatch {
280    fn new(source_path: &str) -> FloeResult<Self> {
281        if !contains_glob_metachar(source_path) {
282            return Ok(Self {
283                list_prefix: source_path.to_string(),
284                glob_pattern: None,
285                matcher: None,
286            });
287        }
288
289        let list_prefix = prefix_before_first_glob(source_path);
290        if list_prefix.trim_matches('/').is_empty() {
291            return Err(Box::new(crate::errors::RunError(
292                "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
293                    .to_string(),
294            )));
295        }
296
297        let matcher = Pattern::new(source_path).map_err(|err| {
298            Box::new(crate::errors::RunError(format!(
299                "invalid cloud glob pattern {:?}: {err}",
300                source_path
301            ))) as Box<dyn std::error::Error + Send + Sync>
302        })?;
303
304        Ok(Self {
305            list_prefix: list_prefix.to_string(),
306            glob_pattern: Some(source_path.to_string()),
307            matcher: Some(matcher),
308        })
309    }
310
311    fn list_prefix(&self) -> &str {
312        self.list_prefix.as_str()
313    }
314
315    fn matches_key(&self, key: &str) -> bool {
316        match &self.matcher {
317            Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
318            None => true,
319        }
320    }
321
322    fn match_description(&self) -> String {
323        match &self.glob_pattern {
324            Some(pattern) => format!("glob={pattern}"),
325            None => "glob=<none>".to_string(),
326        }
327    }
328}
329
330fn filter_cloud_list_refs(
331    list_refs: Vec<io::storage::planner::ObjectRef>,
332    source_match: &CloudSourceMatch,
333    suffixes: &[String],
334) -> Vec<io::storage::planner::ObjectRef> {
335    let filtered = list_refs
336        .into_iter()
337        .filter(|obj| source_match.matches_key(&obj.key))
338        .collect::<Vec<_>>();
339    let filtered = planner::filter_by_suffixes(filtered, suffixes);
340    planner::stable_sort_refs(filtered)
341}
342
343fn contains_glob_metachar(value: &str) -> bool {
344    first_glob_metachar_index(value).is_some()
345}
346
347fn prefix_before_first_glob(value: &str) -> &str {
348    match first_glob_metachar_index(value) {
349        Some(index) => &value[..index],
350        None => value,
351    }
352}
353
354fn first_glob_metachar_index(value: &str) -> Option<usize> {
355    let mut escaped = false;
356    for (idx, ch) in value.char_indices() {
357        if escaped {
358            escaped = false;
359            continue;
360        }
361        if ch == '\\' {
362            escaped = true;
363            continue;
364        }
365        if matches!(ch, '*' | '?') {
366            return Some(idx);
367        }
368    }
369    None
370}
371
372fn cloud_glob_match_options() -> MatchOptions {
373    MatchOptions {
374        case_sensitive: true,
375        require_literal_separator: true,
376        require_literal_leading_dot: false,
377    }
378}