1use std::path::{Path, PathBuf};
2use std::time::SystemTime;
3
4use glob::{MatchOptions, Pattern};
5
6use crate::io::storage::{planner, Target};
7use crate::{config, io, report, FloeResult};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ResolveInputsMode {
11 Download,
12 ListOnly,
13}
14
15#[derive(Debug, Clone)]
16pub struct ResolvedInputs {
17 pub files: Vec<io::format::InputFile>,
18 pub listed: Vec<String>,
19 pub mode: report::ResolvedInputMode,
20}
21
22pub fn resolve_inputs(
23 config_dir: &Path,
24 entity: &config::EntityConfig,
25 adapter: &dyn io::format::InputAdapter,
26 target: &Target,
27 resolution_mode: ResolveInputsMode,
28 temp_dir: Option<&Path>,
29 storage_client: Option<&dyn crate::io::storage::StorageClient>,
30) -> FloeResult<ResolvedInputs> {
31 match target {
33 Target::S3 { storage, .. } => {
34 let client = require_storage_client(storage_client, "s3")?;
35 let (bucket, key) = target.s3_parts().ok_or_else(|| {
36 Box::new(crate::errors::RunError(
37 "s3 target missing bucket".to_string(),
38 ))
39 })?;
40 let location = format!("bucket={}", bucket);
41 resolve_cloud_inputs_for_prefix(
42 client,
43 key,
44 adapter,
45 entity,
46 storage,
47 &location,
48 resolution_mode,
49 temp_dir,
50 )
51 }
52 Target::Gcs { storage, .. } => {
53 let client = require_storage_client(storage_client, "gcs")?;
54 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
55 Box::new(crate::errors::RunError(
56 "gcs target missing bucket".to_string(),
57 ))
58 })?;
59 let location = format!("bucket={}", bucket);
60 resolve_cloud_inputs_for_prefix(
61 client,
62 key,
63 adapter,
64 entity,
65 storage,
66 &location,
67 resolution_mode,
68 temp_dir,
69 )
70 }
71 Target::Adls { storage, .. } => {
72 let client = require_storage_client(storage_client, "adls")?;
73 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
74 Box::new(crate::errors::RunError(
75 "adls target missing container".to_string(),
76 ))
77 })?;
78 let location = format!("container={}, account={}", container, account);
79 resolve_cloud_inputs_for_prefix(
80 client,
81 base_path,
82 adapter,
83 entity,
84 storage,
85 &location,
86 resolution_mode,
87 temp_dir,
88 )
89 }
90 Target::Local { storage, .. } => {
91 let resolved =
92 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
93 let listed = build_local_listing(&resolved.files, storage_client);
94 let files = match resolution_mode {
95 ResolveInputsMode::Download => {
96 build_local_inputs(&resolved.files, entity, storage_client)
97 }
98 ResolveInputsMode::ListOnly => Vec::new(),
99 };
100 let mode = match resolved.mode {
101 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
102 io::storage::local::LocalInputMode::Directory => {
103 report::ResolvedInputMode::Directory
104 }
105 };
106 Ok(ResolvedInputs {
107 files,
108 listed,
109 mode,
110 })
111 }
112 }
113}
114
115fn resolve_cloud_inputs_for_prefix(
116 client: &dyn crate::io::storage::StorageClient,
117 prefix: &str,
118 adapter: &dyn io::format::InputAdapter,
119 entity: &config::EntityConfig,
120 storage: &str,
121 location: &str,
122 resolution_mode: ResolveInputsMode,
123 temp_dir: Option<&Path>,
124) -> FloeResult<ResolvedInputs> {
125 let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
126 let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
127 let files = match resolution_mode {
128 ResolveInputsMode::Download => {
129 let temp_dir = require_temp_dir(temp_dir, storage)?;
130 build_cloud_inputs(client, &objects, temp_dir, entity)?
131 }
132 ResolveInputsMode::ListOnly => Vec::new(),
133 };
134 Ok(ResolvedInputs {
135 files,
136 listed,
137 mode: report::ResolvedInputMode::Directory,
138 })
139}
140
141fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
142 temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
143 Box::new(crate::errors::RunError(format!(
144 "{} tempdir missing",
145 label
146 )))
147 })
148}
149
150fn require_storage_client<'a>(
151 storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
152 label: &str,
153) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
154 storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
155 Box::new(crate::errors::RunError(format!(
156 "{} storage client missing",
157 label
158 )))
159 })
160}
161
162fn list_cloud_objects(
163 client: &dyn crate::io::storage::StorageClient,
164 source_path: &str,
165 adapter: &dyn io::format::InputAdapter,
166 entity: &config::EntityConfig,
167 storage: &str,
168 location: &str,
169) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
170 let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
171 Box::new(crate::errors::RunError(format!(
172 "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
173 entity.name, storage, location, source_path, err
174 ))) as Box<dyn std::error::Error + Send + Sync>
175 })?;
176 let suffixes = adapter.suffixes()?;
177 let list_refs = client.list(source_match.list_prefix())?;
178 let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
179 if filtered.is_empty() {
180 let match_desc = source_match.match_description();
181 return Err(Box::new(crate::errors::RunError(format!(
182 "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
183 entity.name,
184 storage,
185 location,
186 source_match.list_prefix(),
187 match_desc,
188 suffixes.join(",")
189 ))));
190 }
191 Ok(filtered)
192}
193
194fn build_cloud_inputs(
195 client: &dyn crate::io::storage::StorageClient,
196 objects: &[io::storage::planner::ObjectRef],
197 temp_dir: &Path,
198 entity: &config::EntityConfig,
199) -> FloeResult<Vec<io::format::InputFile>> {
200 let mut inputs = Vec::with_capacity(objects.len());
201 for object in objects {
202 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
203 let source_name =
204 planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
205 let source_stem =
206 planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
207 let source_uri = object.uri.clone();
208 inputs.push(io::format::InputFile {
209 source_uri,
210 source_local_path: local_path,
211 source_name,
212 source_stem,
213 source_size: object.size,
214 source_mtime: object.last_modified.clone(),
215 });
216 }
217 Ok(inputs)
218}
219
220fn build_local_inputs(
221 files: &[PathBuf],
222 entity: &config::EntityConfig,
223 storage_client: Option<&dyn crate::io::storage::StorageClient>,
224) -> Vec<io::format::InputFile> {
225 files
226 .iter()
227 .map(|path| {
228 let normalized_path = crate::io::storage::paths::normalize_local_path(path);
229 let metadata = std::fs::metadata(&normalized_path).ok();
230 let source_name = path
231 .file_name()
232 .and_then(|name| name.to_str())
233 .unwrap_or(entity.name.as_str())
234 .to_string();
235 let source_stem = Path::new(&source_name)
236 .file_stem()
237 .and_then(|stem| stem.to_str())
238 .unwrap_or(entity.name.as_str())
239 .to_string();
240 let uri = storage_client
241 .and_then(|client| {
242 client
243 .resolve_uri(&normalized_path.display().to_string())
244 .ok()
245 })
246 .unwrap_or_else(|| normalized_path.display().to_string());
247 io::format::InputFile {
248 source_uri: uri,
249 source_local_path: normalized_path,
250 source_name,
251 source_stem,
252 source_size: metadata.as_ref().map(|metadata| metadata.len()),
253 source_mtime: metadata
254 .as_ref()
255 .and_then(|metadata| metadata.modified().ok())
256 .and_then(system_time_to_rfc3339),
257 }
258 })
259 .collect()
260}
261
262fn system_time_to_rfc3339(value: SystemTime) -> Option<String> {
263 time::OffsetDateTime::from(value)
264 .format(&time::format_description::well_known::Rfc3339)
265 .ok()
266}
267
268fn build_local_listing(
269 files: &[PathBuf],
270 storage_client: Option<&dyn crate::io::storage::StorageClient>,
271) -> Vec<String> {
272 files
273 .iter()
274 .map(|path| {
275 let normalized_path = crate::io::storage::paths::normalize_local_path(path);
276 storage_client
277 .and_then(|client| {
278 client
279 .resolve_uri(&normalized_path.display().to_string())
280 .ok()
281 })
282 .unwrap_or_else(|| normalized_path.display().to_string())
283 })
284 .collect()
285}
286
287#[derive(Debug)]
288struct CloudSourceMatch {
289 list_prefix: String,
290 glob_pattern: Option<String>,
291 matcher: Option<Pattern>,
292}
293
294impl CloudSourceMatch {
295 fn new(source_path: &str) -> FloeResult<Self> {
296 if !contains_glob_metachar(source_path) {
297 return Ok(Self {
298 list_prefix: source_path.to_string(),
299 glob_pattern: None,
300 matcher: None,
301 });
302 }
303
304 let list_prefix = prefix_before_first_glob(source_path);
305 if list_prefix.trim_matches('/').is_empty() {
306 return Err(Box::new(crate::errors::RunError(
307 "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
308 .to_string(),
309 )));
310 }
311
312 let matcher = Pattern::new(source_path).map_err(|err| {
313 Box::new(crate::errors::RunError(format!(
314 "invalid cloud glob pattern {:?}: {err}",
315 source_path
316 ))) as Box<dyn std::error::Error + Send + Sync>
317 })?;
318
319 Ok(Self {
320 list_prefix: list_prefix.to_string(),
321 glob_pattern: Some(source_path.to_string()),
322 matcher: Some(matcher),
323 })
324 }
325
326 fn list_prefix(&self) -> &str {
327 self.list_prefix.as_str()
328 }
329
330 fn matches_key(&self, key: &str) -> bool {
331 match &self.matcher {
332 Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
333 None => true,
334 }
335 }
336
337 fn match_description(&self) -> String {
338 match &self.glob_pattern {
339 Some(pattern) => format!("glob={pattern}"),
340 None => "glob=<none>".to_string(),
341 }
342 }
343}
344
345fn filter_cloud_list_refs(
346 list_refs: Vec<io::storage::planner::ObjectRef>,
347 source_match: &CloudSourceMatch,
348 suffixes: &[String],
349) -> Vec<io::storage::planner::ObjectRef> {
350 let filtered = list_refs
351 .into_iter()
352 .filter(|obj| source_match.matches_key(&obj.key))
353 .collect::<Vec<_>>();
354 let filtered = planner::filter_by_suffixes(filtered, suffixes);
355 planner::stable_sort_refs(filtered)
356}
357
358fn contains_glob_metachar(value: &str) -> bool {
359 first_glob_metachar_index(value).is_some()
360}
361
362fn prefix_before_first_glob(value: &str) -> &str {
363 match first_glob_metachar_index(value) {
364 Some(index) => &value[..index],
365 None => value,
366 }
367}
368
369fn first_glob_metachar_index(value: &str) -> Option<usize> {
370 let mut escaped = false;
371 for (idx, ch) in value.char_indices() {
372 if escaped {
373 escaped = false;
374 continue;
375 }
376 if ch == '\\' {
377 escaped = true;
378 continue;
379 }
380 if matches!(ch, '*' | '?') {
381 return Some(idx);
382 }
383 }
384 None
385}
386
387fn cloud_glob_match_options() -> MatchOptions {
388 MatchOptions {
389 case_sensitive: true,
390 require_literal_separator: true,
391 require_literal_leading_dot: false,
392 }
393}