1use std::path::{Path, PathBuf};
2use std::time::SystemTime;
3
4use glob::{MatchOptions, Pattern};
5
6use crate::errors::RunError;
7use crate::io::storage::{planner, Target};
8use crate::{config, io, report, FloeResult};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ResolveInputsMode {
12 Download,
13 ListOnly,
14}
15
16#[derive(Debug, Clone)]
17pub struct ResolvedInputs {
18 pub files: Vec<io::format::InputFile>,
19 pub listed: Vec<String>,
20 pub mode: report::ResolvedInputMode,
21}
22
23pub fn resolve_inputs(
24 config_dir: &Path,
25 entity: &config::EntityConfig,
26 adapter: &dyn io::format::InputAdapter,
27 target: &Target,
28 resolution_mode: ResolveInputsMode,
29 temp_dir: Option<&Path>,
30 storage_client: Option<&dyn crate::io::storage::StorageClient>,
31) -> FloeResult<ResolvedInputs> {
32 match target {
34 Target::S3 { storage, .. } => {
35 let client = require_storage_client(storage_client, "s3")?;
36 let (bucket, key) = target.s3_parts().ok_or_else(|| {
37 Box::new(crate::errors::RunError(
38 "s3 target missing bucket".to_string(),
39 ))
40 })?;
41 let location = format!("bucket={}", bucket);
42 resolve_cloud_inputs_for_prefix(
43 client,
44 key,
45 adapter,
46 entity,
47 storage,
48 &location,
49 resolution_mode,
50 temp_dir,
51 )
52 }
53 Target::Gcs { storage, .. } => {
54 let client = require_storage_client(storage_client, "gcs")?;
55 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
56 Box::new(crate::errors::RunError(
57 "gcs target missing bucket".to_string(),
58 ))
59 })?;
60 let location = format!("bucket={}", bucket);
61 resolve_cloud_inputs_for_prefix(
62 client,
63 key,
64 adapter,
65 entity,
66 storage,
67 &location,
68 resolution_mode,
69 temp_dir,
70 )
71 }
72 Target::Adls { storage, .. } => {
73 let client = require_storage_client(storage_client, "adls")?;
74 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
75 Box::new(crate::errors::RunError(
76 "adls target missing container".to_string(),
77 ))
78 })?;
79 let location = format!("container={}, account={}", container, account);
80 resolve_cloud_inputs_for_prefix(
81 client,
82 base_path,
83 adapter,
84 entity,
85 storage,
86 &location,
87 resolution_mode,
88 temp_dir,
89 )
90 }
91 Target::Local { storage, .. } => {
92 let resolved =
93 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
94 let listed = build_local_listing(&resolved.files, storage_client);
95 let files = match resolution_mode {
97 ResolveInputsMode::Download => {
98 build_local_inputs(&resolved.files, entity, storage_client)
99 }
100 ResolveInputsMode::ListOnly => Vec::new(),
101 };
102 let mode = match resolved.mode {
103 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
104 io::storage::local::LocalInputMode::Directory => {
105 report::ResolvedInputMode::Directory
106 }
107 };
108 Ok(ResolvedInputs {
109 files,
110 listed,
111 mode,
112 })
113 }
114 }
115}
116
117fn resolve_cloud_inputs_for_prefix(
118 client: &dyn crate::io::storage::StorageClient,
119 prefix: &str,
120 adapter: &dyn io::format::InputAdapter,
121 entity: &config::EntityConfig,
122 storage: &str,
123 location: &str,
124 resolution_mode: ResolveInputsMode,
125 _temp_dir: Option<&Path>,
126) -> FloeResult<ResolvedInputs> {
127 let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
128 let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
129 let files = match resolution_mode {
131 ResolveInputsMode::Download => list_cloud_inputs(&objects, entity),
132 ResolveInputsMode::ListOnly => Vec::new(),
133 };
134 Ok(ResolvedInputs {
135 files,
136 listed,
137 mode: report::ResolvedInputMode::Directory,
138 })
139}
140
141fn require_storage_client<'a>(
142 storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
143 label: &str,
144) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
145 storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
146 Box::new(crate::errors::RunError(format!(
147 "{} storage client missing",
148 label
149 )))
150 })
151}
152
153fn list_cloud_objects(
154 client: &dyn crate::io::storage::StorageClient,
155 source_path: &str,
156 adapter: &dyn io::format::InputAdapter,
157 entity: &config::EntityConfig,
158 storage: &str,
159 location: &str,
160) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
161 let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
162 Box::new(crate::errors::RunError(format!(
163 "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
164 entity.name, storage, location, source_path, err
165 ))) as Box<dyn std::error::Error + Send + Sync>
166 })?;
167 let suffixes = adapter.suffixes()?;
168 let list_refs = client.list(source_match.list_prefix())?;
169 let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
170 if filtered.is_empty() {
171 let match_desc = source_match.match_description();
172 return Err(Box::new(crate::errors::RunError(format!(
173 "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
174 entity.name,
175 storage,
176 location,
177 source_match.list_prefix(),
178 match_desc,
179 suffixes.join(",")
180 ))));
181 }
182 Ok(filtered)
183}
184
185fn list_cloud_inputs(
188 objects: &[io::storage::planner::ObjectRef],
189 entity: &config::EntityConfig,
190) -> Vec<io::format::InputFile> {
191 objects
192 .iter()
193 .map(|object| {
194 let source_name =
195 planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
196 let source_stem =
197 planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
198 io::format::InputFile {
199 source_uri: object.uri.clone(),
200 source_name,
201 source_stem,
202 source_size: object.size,
203 source_mtime: object.last_modified.clone(),
204 }
205 })
206 .collect()
207}
208
209fn build_local_inputs(
210 files: &[PathBuf],
211 entity: &config::EntityConfig,
212 storage_client: Option<&dyn crate::io::storage::StorageClient>,
213) -> Vec<io::format::InputFile> {
214 files
215 .iter()
216 .map(|path| {
217 let normalized_path = crate::io::storage::paths::normalize_local_path(path);
218 let metadata = std::fs::metadata(&normalized_path).ok();
219 let source_name = path
220 .file_name()
221 .and_then(|name| name.to_str())
222 .unwrap_or(entity.name.as_str())
223 .to_string();
224 let source_stem = Path::new(&source_name)
225 .file_stem()
226 .and_then(|stem| stem.to_str())
227 .unwrap_or(entity.name.as_str())
228 .to_string();
229 let uri = storage_client
230 .and_then(|client| {
231 client
232 .resolve_uri(&normalized_path.display().to_string())
233 .ok()
234 })
235 .unwrap_or_else(|| normalized_path.display().to_string());
236 io::format::InputFile {
237 source_uri: uri,
238 source_name,
239 source_stem,
240 source_size: metadata.as_ref().map(|metadata| metadata.len()),
241 source_mtime: metadata
242 .as_ref()
243 .and_then(|metadata| metadata.modified().ok())
244 .and_then(system_time_to_rfc3339),
245 }
246 })
247 .collect()
248}
249
250pub fn localize_input(
256 input_file: io::format::InputFile,
257 storage_client: Option<&dyn crate::io::storage::StorageClient>,
258 temp_dir: Option<&Path>,
259) -> FloeResult<io::format::LocalInputFile> {
260 if is_cloud_uri(&input_file.source_uri) {
261 let client = storage_client.ok_or_else(|| {
262 Box::new(RunError(format!(
263 "storage client required to download {}",
264 input_file.source_uri
265 ))) as Box<dyn std::error::Error + Send + Sync>
266 })?;
267 let temp = temp_dir.ok_or_else(|| {
268 Box::new(RunError(format!(
269 "temp_dir required to download {}",
270 input_file.source_uri
271 ))) as Box<dyn std::error::Error + Send + Sync>
272 })?;
273 let local_path = client.download_to_temp(&input_file.source_uri, temp)?;
274 Ok(io::format::LocalInputFile {
275 file: input_file,
276 local_path,
277 is_ephemeral: true,
278 })
279 } else {
280 let path_str = input_file.source_uri.trim_start_matches("local://");
282 Ok(io::format::LocalInputFile {
283 local_path: PathBuf::from(path_str),
284 file: input_file,
285 is_ephemeral: false,
286 })
287 }
288}
289
290fn is_cloud_uri(uri: &str) -> bool {
291 uri.starts_with("s3://")
292 || uri.starts_with("gs://")
293 || uri.starts_with("abfs://")
294 || uri.starts_with("az://")
295 || uri.starts_with("gcs://")
296}
297
298fn system_time_to_rfc3339(value: SystemTime) -> Option<String> {
299 time::OffsetDateTime::from(value)
300 .format(&time::format_description::well_known::Rfc3339)
301 .ok()
302}
303
304fn build_local_listing(
305 files: &[PathBuf],
306 storage_client: Option<&dyn crate::io::storage::StorageClient>,
307) -> Vec<String> {
308 files
309 .iter()
310 .map(|path| {
311 let normalized_path = crate::io::storage::paths::normalize_local_path(path);
312 storage_client
313 .and_then(|client| {
314 client
315 .resolve_uri(&normalized_path.display().to_string())
316 .ok()
317 })
318 .unwrap_or_else(|| normalized_path.display().to_string())
319 })
320 .collect()
321}
322
323#[derive(Debug)]
324struct CloudSourceMatch {
325 list_prefix: String,
326 glob_pattern: Option<String>,
327 matcher: Option<Pattern>,
328}
329
330impl CloudSourceMatch {
331 fn new(source_path: &str) -> FloeResult<Self> {
332 if !contains_glob_metachar(source_path) {
333 return Ok(Self {
334 list_prefix: source_path.to_string(),
335 glob_pattern: None,
336 matcher: None,
337 });
338 }
339
340 let list_prefix = prefix_before_first_glob(source_path);
341 if list_prefix.trim_matches('/').is_empty() {
342 return Err(Box::new(crate::errors::RunError(
343 "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
344 .to_string(),
345 )));
346 }
347
348 let matcher = Pattern::new(source_path).map_err(|err| {
349 Box::new(crate::errors::RunError(format!(
350 "invalid cloud glob pattern {:?}: {err}",
351 source_path
352 ))) as Box<dyn std::error::Error + Send + Sync>
353 })?;
354
355 Ok(Self {
356 list_prefix: list_prefix.to_string(),
357 glob_pattern: Some(source_path.to_string()),
358 matcher: Some(matcher),
359 })
360 }
361
362 fn list_prefix(&self) -> &str {
363 self.list_prefix.as_str()
364 }
365
366 fn matches_key(&self, key: &str) -> bool {
367 match &self.matcher {
368 Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
369 None => true,
370 }
371 }
372
373 fn match_description(&self) -> String {
374 match &self.glob_pattern {
375 Some(pattern) => format!("glob={pattern}"),
376 None => "glob=<none>".to_string(),
377 }
378 }
379}
380
381fn filter_cloud_list_refs(
382 list_refs: Vec<io::storage::planner::ObjectRef>,
383 source_match: &CloudSourceMatch,
384 suffixes: &[String],
385) -> Vec<io::storage::planner::ObjectRef> {
386 let filtered = list_refs
387 .into_iter()
388 .filter(|obj| source_match.matches_key(&obj.key))
389 .collect::<Vec<_>>();
390 let filtered = planner::filter_by_suffixes(filtered, suffixes);
391 planner::stable_sort_refs(filtered)
392}
393
394fn contains_glob_metachar(value: &str) -> bool {
395 first_glob_metachar_index(value).is_some()
396}
397
398fn prefix_before_first_glob(value: &str) -> &str {
399 match first_glob_metachar_index(value) {
400 Some(index) => &value[..index],
401 None => value,
402 }
403}
404
405fn first_glob_metachar_index(value: &str) -> Option<usize> {
406 let mut escaped = false;
407 for (idx, ch) in value.char_indices() {
408 if escaped {
409 escaped = false;
410 continue;
411 }
412 if ch == '\\' {
413 escaped = true;
414 continue;
415 }
416 if matches!(ch, '*' | '?') {
417 return Some(idx);
418 }
419 }
420 None
421}
422
423fn cloud_glob_match_options() -> MatchOptions {
424 MatchOptions {
425 case_sensitive: true,
426 require_literal_separator: true,
427 require_literal_leading_dot: false,
428 }
429}