Skip to main content

floe_core/io/storage/ops/
inputs.rs

1use std::path::{Path, PathBuf};
2
3use glob::{MatchOptions, Pattern};
4
5use crate::io::storage::{planner, Target};
6use crate::{config, io, report, FloeResult};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ResolveInputsMode {
10    Download,
11    ListOnly,
12}
13
14#[derive(Debug, Clone)]
15pub struct ResolvedInputs {
16    pub files: Vec<io::format::InputFile>,
17    pub listed: Vec<String>,
18    pub mode: report::ResolvedInputMode,
19}
20
21pub fn resolve_inputs(
22    config_dir: &Path,
23    entity: &config::EntityConfig,
24    adapter: &dyn io::format::InputAdapter,
25    target: &Target,
26    resolution_mode: ResolveInputsMode,
27    temp_dir: Option<&Path>,
28    storage_client: Option<&dyn crate::io::storage::StorageClient>,
29) -> FloeResult<ResolvedInputs> {
30    // Storage-specific resolution: list + download for cloud, direct paths for local.
31    match target {
32        Target::S3 { storage, .. } => {
33            let client = require_storage_client(storage_client, "s3")?;
34            let (bucket, key) = target.s3_parts().ok_or_else(|| {
35                Box::new(crate::errors::RunError(
36                    "s3 target missing bucket".to_string(),
37                ))
38            })?;
39            let location = format!("bucket={}", bucket);
40            resolve_cloud_inputs_for_prefix(
41                client,
42                key,
43                adapter,
44                entity,
45                storage,
46                &location,
47                resolution_mode,
48                temp_dir,
49            )
50        }
51        Target::Gcs { storage, .. } => {
52            let client = require_storage_client(storage_client, "gcs")?;
53            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
54                Box::new(crate::errors::RunError(
55                    "gcs target missing bucket".to_string(),
56                ))
57            })?;
58            let location = format!("bucket={}", bucket);
59            resolve_cloud_inputs_for_prefix(
60                client,
61                key,
62                adapter,
63                entity,
64                storage,
65                &location,
66                resolution_mode,
67                temp_dir,
68            )
69        }
70        Target::Adls { storage, .. } => {
71            let client = require_storage_client(storage_client, "adls")?;
72            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
73                Box::new(crate::errors::RunError(
74                    "adls target missing container".to_string(),
75                ))
76            })?;
77            let location = format!("container={}, account={}", container, account);
78            resolve_cloud_inputs_for_prefix(
79                client,
80                base_path,
81                adapter,
82                entity,
83                storage,
84                &location,
85                resolution_mode,
86                temp_dir,
87            )
88        }
89        Target::Local { storage, .. } => {
90            let resolved =
91                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
92            let listed = build_local_listing(&resolved.files, storage_client);
93            let files = match resolution_mode {
94                ResolveInputsMode::Download => {
95                    build_local_inputs(&resolved.files, entity, storage_client)
96                }
97                ResolveInputsMode::ListOnly => Vec::new(),
98            };
99            let mode = match resolved.mode {
100                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
101                io::storage::local::LocalInputMode::Directory => {
102                    report::ResolvedInputMode::Directory
103                }
104            };
105            Ok(ResolvedInputs {
106                files,
107                listed,
108                mode,
109            })
110        }
111    }
112}
113
114fn resolve_cloud_inputs_for_prefix(
115    client: &dyn crate::io::storage::StorageClient,
116    prefix: &str,
117    adapter: &dyn io::format::InputAdapter,
118    entity: &config::EntityConfig,
119    storage: &str,
120    location: &str,
121    resolution_mode: ResolveInputsMode,
122    temp_dir: Option<&Path>,
123) -> FloeResult<ResolvedInputs> {
124    let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
125    let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
126    let files = match resolution_mode {
127        ResolveInputsMode::Download => {
128            let temp_dir = require_temp_dir(temp_dir, storage)?;
129            build_cloud_inputs(client, &objects, temp_dir, entity)?
130        }
131        ResolveInputsMode::ListOnly => Vec::new(),
132    };
133    Ok(ResolvedInputs {
134        files,
135        listed,
136        mode: report::ResolvedInputMode::Directory,
137    })
138}
139
140fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
141    temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
142        Box::new(crate::errors::RunError(format!(
143            "{} tempdir missing",
144            label
145        )))
146    })
147}
148
149fn require_storage_client<'a>(
150    storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
151    label: &str,
152) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
153    storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
154        Box::new(crate::errors::RunError(format!(
155            "{} storage client missing",
156            label
157        )))
158    })
159}
160
161fn list_cloud_objects(
162    client: &dyn crate::io::storage::StorageClient,
163    source_path: &str,
164    adapter: &dyn io::format::InputAdapter,
165    entity: &config::EntityConfig,
166    storage: &str,
167    location: &str,
168) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
169    let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
170        Box::new(crate::errors::RunError(format!(
171            "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
172            entity.name, storage, location, source_path, err
173        ))) as Box<dyn std::error::Error + Send + Sync>
174    })?;
175    let suffixes = adapter.suffixes()?;
176    let list_refs = client.list(source_match.list_prefix())?;
177    let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
178    if filtered.is_empty() {
179        let match_desc = source_match.match_description();
180        return Err(Box::new(crate::errors::RunError(format!(
181            "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
182            entity.name,
183            storage,
184            location,
185            source_match.list_prefix(),
186            match_desc,
187            suffixes.join(",")
188        ))));
189    }
190    Ok(filtered)
191}
192
193fn build_cloud_inputs(
194    client: &dyn crate::io::storage::StorageClient,
195    objects: &[io::storage::planner::ObjectRef],
196    temp_dir: &Path,
197    entity: &config::EntityConfig,
198) -> FloeResult<Vec<io::format::InputFile>> {
199    let mut inputs = Vec::with_capacity(objects.len());
200    for object in objects {
201        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
202        let source_name =
203            planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
204        let source_stem =
205            planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
206        let source_uri = object.uri.clone();
207        inputs.push(io::format::InputFile {
208            source_uri,
209            source_local_path: local_path,
210            source_name,
211            source_stem,
212        });
213    }
214    Ok(inputs)
215}
216
217fn build_local_inputs(
218    files: &[PathBuf],
219    entity: &config::EntityConfig,
220    storage_client: Option<&dyn crate::io::storage::StorageClient>,
221) -> Vec<io::format::InputFile> {
222    files
223        .iter()
224        .map(|path| {
225            let source_name = path
226                .file_name()
227                .and_then(|name| name.to_str())
228                .unwrap_or(entity.name.as_str())
229                .to_string();
230            let source_stem = Path::new(&source_name)
231                .file_stem()
232                .and_then(|stem| stem.to_str())
233                .unwrap_or(entity.name.as_str())
234                .to_string();
235            let uri = storage_client
236                .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
237                .unwrap_or_else(|| path.display().to_string());
238            io::format::InputFile {
239                source_uri: uri,
240                source_local_path: path.clone(),
241                source_name,
242                source_stem,
243            }
244        })
245        .collect()
246}
247
248fn build_local_listing(
249    files: &[PathBuf],
250    storage_client: Option<&dyn crate::io::storage::StorageClient>,
251) -> Vec<String> {
252    files
253        .iter()
254        .map(|path| {
255            storage_client
256                .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
257                .unwrap_or_else(|| path.display().to_string())
258        })
259        .collect()
260}
261
262#[derive(Debug)]
263struct CloudSourceMatch {
264    list_prefix: String,
265    glob_pattern: Option<String>,
266    matcher: Option<Pattern>,
267}
268
269impl CloudSourceMatch {
270    fn new(source_path: &str) -> FloeResult<Self> {
271        if !contains_glob_metachar(source_path) {
272            return Ok(Self {
273                list_prefix: source_path.to_string(),
274                glob_pattern: None,
275                matcher: None,
276            });
277        }
278
279        let list_prefix = prefix_before_first_glob(source_path);
280        if list_prefix.trim_matches('/').is_empty() {
281            return Err(Box::new(crate::errors::RunError(
282                "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
283                    .to_string(),
284            )));
285        }
286
287        let matcher = Pattern::new(source_path).map_err(|err| {
288            Box::new(crate::errors::RunError(format!(
289                "invalid cloud glob pattern {:?}: {err}",
290                source_path
291            ))) as Box<dyn std::error::Error + Send + Sync>
292        })?;
293
294        Ok(Self {
295            list_prefix: list_prefix.to_string(),
296            glob_pattern: Some(source_path.to_string()),
297            matcher: Some(matcher),
298        })
299    }
300
301    fn list_prefix(&self) -> &str {
302        self.list_prefix.as_str()
303    }
304
305    fn matches_key(&self, key: &str) -> bool {
306        match &self.matcher {
307            Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
308            None => true,
309        }
310    }
311
312    fn match_description(&self) -> String {
313        match &self.glob_pattern {
314            Some(pattern) => format!("glob={pattern}"),
315            None => "glob=<none>".to_string(),
316        }
317    }
318}
319
320fn filter_cloud_list_refs(
321    list_refs: Vec<io::storage::planner::ObjectRef>,
322    source_match: &CloudSourceMatch,
323    suffixes: &[String],
324) -> Vec<io::storage::planner::ObjectRef> {
325    let filtered = list_refs
326        .into_iter()
327        .filter(|obj| source_match.matches_key(&obj.key))
328        .collect::<Vec<_>>();
329    let filtered = planner::filter_by_suffixes(filtered, suffixes);
330    planner::stable_sort_refs(filtered)
331}
332
333fn contains_glob_metachar(value: &str) -> bool {
334    first_glob_metachar_index(value).is_some()
335}
336
337fn prefix_before_first_glob(value: &str) -> &str {
338    match first_glob_metachar_index(value) {
339        Some(index) => &value[..index],
340        None => value,
341    }
342}
343
344fn first_glob_metachar_index(value: &str) -> Option<usize> {
345    let mut escaped = false;
346    for (idx, ch) in value.char_indices() {
347        if escaped {
348            escaped = false;
349            continue;
350        }
351        if ch == '\\' {
352            escaped = true;
353            continue;
354        }
355        if matches!(ch, '*' | '?') {
356            return Some(idx);
357        }
358    }
359    None
360}
361
362fn cloud_glob_match_options() -> MatchOptions {
363    MatchOptions {
364        case_sensitive: true,
365        require_literal_separator: true,
366        require_literal_leading_dot: false,
367    }
368}