Skip to main content

floe_core/io/storage/ops/
inputs.rs

1use std::path::{Path, PathBuf};
2use std::time::SystemTime;
3
4use glob::{MatchOptions, Pattern};
5
6use crate::io::storage::{planner, Target};
7use crate::{config, io, report, FloeResult};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ResolveInputsMode {
11    Download,
12    ListOnly,
13}
14
15#[derive(Debug, Clone)]
16pub struct ResolvedInputs {
17    pub files: Vec<io::format::InputFile>,
18    pub listed: Vec<String>,
19    pub mode: report::ResolvedInputMode,
20}
21
22pub fn resolve_inputs(
23    config_dir: &Path,
24    entity: &config::EntityConfig,
25    adapter: &dyn io::format::InputAdapter,
26    target: &Target,
27    resolution_mode: ResolveInputsMode,
28    temp_dir: Option<&Path>,
29    storage_client: Option<&dyn crate::io::storage::StorageClient>,
30) -> FloeResult<ResolvedInputs> {
31    // Storage-specific resolution: list + download for cloud, direct paths for local.
32    match target {
33        Target::S3 { storage, .. } => {
34            let client = require_storage_client(storage_client, "s3")?;
35            let (bucket, key) = target.s3_parts().ok_or_else(|| {
36                Box::new(crate::errors::RunError(
37                    "s3 target missing bucket".to_string(),
38                ))
39            })?;
40            let location = format!("bucket={}", bucket);
41            resolve_cloud_inputs_for_prefix(
42                client,
43                key,
44                adapter,
45                entity,
46                storage,
47                &location,
48                resolution_mode,
49                temp_dir,
50            )
51        }
52        Target::Gcs { storage, .. } => {
53            let client = require_storage_client(storage_client, "gcs")?;
54            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
55                Box::new(crate::errors::RunError(
56                    "gcs target missing bucket".to_string(),
57                ))
58            })?;
59            let location = format!("bucket={}", bucket);
60            resolve_cloud_inputs_for_prefix(
61                client,
62                key,
63                adapter,
64                entity,
65                storage,
66                &location,
67                resolution_mode,
68                temp_dir,
69            )
70        }
71        Target::Adls { storage, .. } => {
72            let client = require_storage_client(storage_client, "adls")?;
73            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
74                Box::new(crate::errors::RunError(
75                    "adls target missing container".to_string(),
76                ))
77            })?;
78            let location = format!("container={}, account={}", container, account);
79            resolve_cloud_inputs_for_prefix(
80                client,
81                base_path,
82                adapter,
83                entity,
84                storage,
85                &location,
86                resolution_mode,
87                temp_dir,
88            )
89        }
90        Target::Local { storage, .. } => {
91            let resolved =
92                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
93            let listed = build_local_listing(&resolved.files, storage_client);
94            let files = match resolution_mode {
95                ResolveInputsMode::Download => {
96                    build_local_inputs(&resolved.files, entity, storage_client)
97                }
98                ResolveInputsMode::ListOnly => Vec::new(),
99            };
100            let mode = match resolved.mode {
101                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
102                io::storage::local::LocalInputMode::Directory => {
103                    report::ResolvedInputMode::Directory
104                }
105            };
106            Ok(ResolvedInputs {
107                files,
108                listed,
109                mode,
110            })
111        }
112    }
113}
114
115fn resolve_cloud_inputs_for_prefix(
116    client: &dyn crate::io::storage::StorageClient,
117    prefix: &str,
118    adapter: &dyn io::format::InputAdapter,
119    entity: &config::EntityConfig,
120    storage: &str,
121    location: &str,
122    resolution_mode: ResolveInputsMode,
123    temp_dir: Option<&Path>,
124) -> FloeResult<ResolvedInputs> {
125    let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
126    let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
127    let files = match resolution_mode {
128        ResolveInputsMode::Download => {
129            let temp_dir = require_temp_dir(temp_dir, storage)?;
130            build_cloud_inputs(client, &objects, temp_dir, entity)?
131        }
132        ResolveInputsMode::ListOnly => Vec::new(),
133    };
134    Ok(ResolvedInputs {
135        files,
136        listed,
137        mode: report::ResolvedInputMode::Directory,
138    })
139}
140
141fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
142    temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
143        Box::new(crate::errors::RunError(format!(
144            "{} tempdir missing",
145            label
146        )))
147    })
148}
149
150fn require_storage_client<'a>(
151    storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
152    label: &str,
153) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
154    storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
155        Box::new(crate::errors::RunError(format!(
156            "{} storage client missing",
157            label
158        )))
159    })
160}
161
162fn list_cloud_objects(
163    client: &dyn crate::io::storage::StorageClient,
164    source_path: &str,
165    adapter: &dyn io::format::InputAdapter,
166    entity: &config::EntityConfig,
167    storage: &str,
168    location: &str,
169) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
170    let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
171        Box::new(crate::errors::RunError(format!(
172            "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
173            entity.name, storage, location, source_path, err
174        ))) as Box<dyn std::error::Error + Send + Sync>
175    })?;
176    let suffixes = adapter.suffixes()?;
177    let list_refs = client.list(source_match.list_prefix())?;
178    let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
179    if filtered.is_empty() {
180        let match_desc = source_match.match_description();
181        return Err(Box::new(crate::errors::RunError(format!(
182            "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
183            entity.name,
184            storage,
185            location,
186            source_match.list_prefix(),
187            match_desc,
188            suffixes.join(",")
189        ))));
190    }
191    Ok(filtered)
192}
193
194fn build_cloud_inputs(
195    client: &dyn crate::io::storage::StorageClient,
196    objects: &[io::storage::planner::ObjectRef],
197    temp_dir: &Path,
198    entity: &config::EntityConfig,
199) -> FloeResult<Vec<io::format::InputFile>> {
200    let mut inputs = Vec::with_capacity(objects.len());
201    for object in objects {
202        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
203        let source_name =
204            planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
205        let source_stem =
206            planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
207        let source_uri = object.uri.clone();
208        inputs.push(io::format::InputFile {
209            source_uri,
210            source_local_path: local_path,
211            source_name,
212            source_stem,
213            source_size: object.size,
214            source_mtime: object.last_modified.clone(),
215        });
216    }
217    Ok(inputs)
218}
219
220fn build_local_inputs(
221    files: &[PathBuf],
222    entity: &config::EntityConfig,
223    storage_client: Option<&dyn crate::io::storage::StorageClient>,
224) -> Vec<io::format::InputFile> {
225    files
226        .iter()
227        .map(|path| {
228            let normalized_path = crate::io::storage::paths::normalize_local_path(path);
229            let metadata = std::fs::metadata(&normalized_path).ok();
230            let source_name = path
231                .file_name()
232                .and_then(|name| name.to_str())
233                .unwrap_or(entity.name.as_str())
234                .to_string();
235            let source_stem = Path::new(&source_name)
236                .file_stem()
237                .and_then(|stem| stem.to_str())
238                .unwrap_or(entity.name.as_str())
239                .to_string();
240            let uri = storage_client
241                .and_then(|client| {
242                    client
243                        .resolve_uri(&normalized_path.display().to_string())
244                        .ok()
245                })
246                .unwrap_or_else(|| normalized_path.display().to_string());
247            io::format::InputFile {
248                source_uri: uri,
249                source_local_path: normalized_path,
250                source_name,
251                source_stem,
252                source_size: metadata.as_ref().map(|metadata| metadata.len()),
253                source_mtime: metadata
254                    .as_ref()
255                    .and_then(|metadata| metadata.modified().ok())
256                    .and_then(system_time_to_rfc3339),
257            }
258        })
259        .collect()
260}
261
262fn system_time_to_rfc3339(value: SystemTime) -> Option<String> {
263    time::OffsetDateTime::from(value)
264        .format(&time::format_description::well_known::Rfc3339)
265        .ok()
266}
267
268fn build_local_listing(
269    files: &[PathBuf],
270    storage_client: Option<&dyn crate::io::storage::StorageClient>,
271) -> Vec<String> {
272    files
273        .iter()
274        .map(|path| {
275            let normalized_path = crate::io::storage::paths::normalize_local_path(path);
276            storage_client
277                .and_then(|client| {
278                    client
279                        .resolve_uri(&normalized_path.display().to_string())
280                        .ok()
281                })
282                .unwrap_or_else(|| normalized_path.display().to_string())
283        })
284        .collect()
285}
286
287#[derive(Debug)]
288struct CloudSourceMatch {
289    list_prefix: String,
290    glob_pattern: Option<String>,
291    matcher: Option<Pattern>,
292}
293
294impl CloudSourceMatch {
295    fn new(source_path: &str) -> FloeResult<Self> {
296        if !contains_glob_metachar(source_path) {
297            return Ok(Self {
298                list_prefix: source_path.to_string(),
299                glob_pattern: None,
300                matcher: None,
301            });
302        }
303
304        let list_prefix = prefix_before_first_glob(source_path);
305        if list_prefix.trim_matches('/').is_empty() {
306            return Err(Box::new(crate::errors::RunError(
307                "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
308                    .to_string(),
309            )));
310        }
311
312        let matcher = Pattern::new(source_path).map_err(|err| {
313            Box::new(crate::errors::RunError(format!(
314                "invalid cloud glob pattern {:?}: {err}",
315                source_path
316            ))) as Box<dyn std::error::Error + Send + Sync>
317        })?;
318
319        Ok(Self {
320            list_prefix: list_prefix.to_string(),
321            glob_pattern: Some(source_path.to_string()),
322            matcher: Some(matcher),
323        })
324    }
325
326    fn list_prefix(&self) -> &str {
327        self.list_prefix.as_str()
328    }
329
330    fn matches_key(&self, key: &str) -> bool {
331        match &self.matcher {
332            Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
333            None => true,
334        }
335    }
336
337    fn match_description(&self) -> String {
338        match &self.glob_pattern {
339            Some(pattern) => format!("glob={pattern}"),
340            None => "glob=<none>".to_string(),
341        }
342    }
343}
344
345fn filter_cloud_list_refs(
346    list_refs: Vec<io::storage::planner::ObjectRef>,
347    source_match: &CloudSourceMatch,
348    suffixes: &[String],
349) -> Vec<io::storage::planner::ObjectRef> {
350    let filtered = list_refs
351        .into_iter()
352        .filter(|obj| source_match.matches_key(&obj.key))
353        .collect::<Vec<_>>();
354    let filtered = planner::filter_by_suffixes(filtered, suffixes);
355    planner::stable_sort_refs(filtered)
356}
357
358fn contains_glob_metachar(value: &str) -> bool {
359    first_glob_metachar_index(value).is_some()
360}
361
362fn prefix_before_first_glob(value: &str) -> &str {
363    match first_glob_metachar_index(value) {
364        Some(index) => &value[..index],
365        None => value,
366    }
367}
368
369fn first_glob_metachar_index(value: &str) -> Option<usize> {
370    let mut escaped = false;
371    for (idx, ch) in value.char_indices() {
372        if escaped {
373            escaped = false;
374            continue;
375        }
376        if ch == '\\' {
377            escaped = true;
378            continue;
379        }
380        if matches!(ch, '*' | '?') {
381            return Some(idx);
382        }
383    }
384    None
385}
386
387fn cloud_glob_match_options() -> MatchOptions {
388    MatchOptions {
389        case_sensitive: true,
390        require_literal_separator: true,
391        require_literal_leading_dot: false,
392    }
393}