1use std::path::{Path, PathBuf};
2
3use glob::{MatchOptions, Pattern};
4
5use crate::io::storage::{planner, Target};
6use crate::{config, io, report, FloeResult};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ResolveInputsMode {
10 Download,
11 ListOnly,
12}
13
14#[derive(Debug, Clone)]
15pub struct ResolvedInputs {
16 pub files: Vec<io::format::InputFile>,
17 pub listed: Vec<String>,
18 pub mode: report::ResolvedInputMode,
19}
20
21pub fn resolve_inputs(
22 config_dir: &Path,
23 entity: &config::EntityConfig,
24 adapter: &dyn io::format::InputAdapter,
25 target: &Target,
26 resolution_mode: ResolveInputsMode,
27 temp_dir: Option<&Path>,
28 storage_client: Option<&dyn crate::io::storage::StorageClient>,
29) -> FloeResult<ResolvedInputs> {
30 match target {
32 Target::S3 { storage, .. } => {
33 let client = require_storage_client(storage_client, "s3")?;
34 let (bucket, key) = target.s3_parts().ok_or_else(|| {
35 Box::new(crate::errors::RunError(
36 "s3 target missing bucket".to_string(),
37 ))
38 })?;
39 let location = format!("bucket={}", bucket);
40 resolve_cloud_inputs_for_prefix(
41 client,
42 key,
43 adapter,
44 entity,
45 storage,
46 &location,
47 resolution_mode,
48 temp_dir,
49 )
50 }
51 Target::Gcs { storage, .. } => {
52 let client = require_storage_client(storage_client, "gcs")?;
53 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
54 Box::new(crate::errors::RunError(
55 "gcs target missing bucket".to_string(),
56 ))
57 })?;
58 let location = format!("bucket={}", bucket);
59 resolve_cloud_inputs_for_prefix(
60 client,
61 key,
62 adapter,
63 entity,
64 storage,
65 &location,
66 resolution_mode,
67 temp_dir,
68 )
69 }
70 Target::Adls { storage, .. } => {
71 let client = require_storage_client(storage_client, "adls")?;
72 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
73 Box::new(crate::errors::RunError(
74 "adls target missing container".to_string(),
75 ))
76 })?;
77 let location = format!("container={}, account={}", container, account);
78 resolve_cloud_inputs_for_prefix(
79 client,
80 base_path,
81 adapter,
82 entity,
83 storage,
84 &location,
85 resolution_mode,
86 temp_dir,
87 )
88 }
89 Target::Local { storage, .. } => {
90 let resolved =
91 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
92 let listed = build_local_listing(&resolved.files, storage_client);
93 let files = match resolution_mode {
94 ResolveInputsMode::Download => {
95 build_local_inputs(&resolved.files, entity, storage_client)
96 }
97 ResolveInputsMode::ListOnly => Vec::new(),
98 };
99 let mode = match resolved.mode {
100 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
101 io::storage::local::LocalInputMode::Directory => {
102 report::ResolvedInputMode::Directory
103 }
104 };
105 Ok(ResolvedInputs {
106 files,
107 listed,
108 mode,
109 })
110 }
111 }
112}
113
114fn resolve_cloud_inputs_for_prefix(
115 client: &dyn crate::io::storage::StorageClient,
116 prefix: &str,
117 adapter: &dyn io::format::InputAdapter,
118 entity: &config::EntityConfig,
119 storage: &str,
120 location: &str,
121 resolution_mode: ResolveInputsMode,
122 temp_dir: Option<&Path>,
123) -> FloeResult<ResolvedInputs> {
124 let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
125 let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
126 let files = match resolution_mode {
127 ResolveInputsMode::Download => {
128 let temp_dir = require_temp_dir(temp_dir, storage)?;
129 build_cloud_inputs(client, &objects, temp_dir, entity)?
130 }
131 ResolveInputsMode::ListOnly => Vec::new(),
132 };
133 Ok(ResolvedInputs {
134 files,
135 listed,
136 mode: report::ResolvedInputMode::Directory,
137 })
138}
139
140fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
141 temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
142 Box::new(crate::errors::RunError(format!(
143 "{} tempdir missing",
144 label
145 )))
146 })
147}
148
149fn require_storage_client<'a>(
150 storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
151 label: &str,
152) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
153 storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
154 Box::new(crate::errors::RunError(format!(
155 "{} storage client missing",
156 label
157 )))
158 })
159}
160
161fn list_cloud_objects(
162 client: &dyn crate::io::storage::StorageClient,
163 source_path: &str,
164 adapter: &dyn io::format::InputAdapter,
165 entity: &config::EntityConfig,
166 storage: &str,
167 location: &str,
168) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
169 let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
170 Box::new(crate::errors::RunError(format!(
171 "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
172 entity.name, storage, location, source_path, err
173 ))) as Box<dyn std::error::Error + Send + Sync>
174 })?;
175 let suffixes = adapter.suffixes()?;
176 let list_refs = client.list(source_match.list_prefix())?;
177 let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
178 if filtered.is_empty() {
179 let match_desc = source_match.match_description();
180 return Err(Box::new(crate::errors::RunError(format!(
181 "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
182 entity.name,
183 storage,
184 location,
185 source_match.list_prefix(),
186 match_desc,
187 suffixes.join(",")
188 ))));
189 }
190 Ok(filtered)
191}
192
193fn build_cloud_inputs(
194 client: &dyn crate::io::storage::StorageClient,
195 objects: &[io::storage::planner::ObjectRef],
196 temp_dir: &Path,
197 entity: &config::EntityConfig,
198) -> FloeResult<Vec<io::format::InputFile>> {
199 let mut inputs = Vec::with_capacity(objects.len());
200 for object in objects {
201 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
202 let source_name =
203 planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
204 let source_stem =
205 planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
206 let source_uri = object.uri.clone();
207 inputs.push(io::format::InputFile {
208 source_uri,
209 source_local_path: local_path,
210 source_name,
211 source_stem,
212 });
213 }
214 Ok(inputs)
215}
216
217fn build_local_inputs(
218 files: &[PathBuf],
219 entity: &config::EntityConfig,
220 storage_client: Option<&dyn crate::io::storage::StorageClient>,
221) -> Vec<io::format::InputFile> {
222 files
223 .iter()
224 .map(|path| {
225 let normalized_path = crate::io::storage::paths::normalize_local_path(path);
226 let source_name = path
227 .file_name()
228 .and_then(|name| name.to_str())
229 .unwrap_or(entity.name.as_str())
230 .to_string();
231 let source_stem = Path::new(&source_name)
232 .file_stem()
233 .and_then(|stem| stem.to_str())
234 .unwrap_or(entity.name.as_str())
235 .to_string();
236 let uri = storage_client
237 .and_then(|client| {
238 client
239 .resolve_uri(&normalized_path.display().to_string())
240 .ok()
241 })
242 .unwrap_or_else(|| normalized_path.display().to_string());
243 io::format::InputFile {
244 source_uri: uri,
245 source_local_path: normalized_path,
246 source_name,
247 source_stem,
248 }
249 })
250 .collect()
251}
252
253fn build_local_listing(
254 files: &[PathBuf],
255 storage_client: Option<&dyn crate::io::storage::StorageClient>,
256) -> Vec<String> {
257 files
258 .iter()
259 .map(|path| {
260 let normalized_path = crate::io::storage::paths::normalize_local_path(path);
261 storage_client
262 .and_then(|client| {
263 client
264 .resolve_uri(&normalized_path.display().to_string())
265 .ok()
266 })
267 .unwrap_or_else(|| normalized_path.display().to_string())
268 })
269 .collect()
270}
271
272#[derive(Debug)]
273struct CloudSourceMatch {
274 list_prefix: String,
275 glob_pattern: Option<String>,
276 matcher: Option<Pattern>,
277}
278
279impl CloudSourceMatch {
280 fn new(source_path: &str) -> FloeResult<Self> {
281 if !contains_glob_metachar(source_path) {
282 return Ok(Self {
283 list_prefix: source_path.to_string(),
284 glob_pattern: None,
285 matcher: None,
286 });
287 }
288
289 let list_prefix = prefix_before_first_glob(source_path);
290 if list_prefix.trim_matches('/').is_empty() {
291 return Err(Box::new(crate::errors::RunError(
292 "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
293 .to_string(),
294 )));
295 }
296
297 let matcher = Pattern::new(source_path).map_err(|err| {
298 Box::new(crate::errors::RunError(format!(
299 "invalid cloud glob pattern {:?}: {err}",
300 source_path
301 ))) as Box<dyn std::error::Error + Send + Sync>
302 })?;
303
304 Ok(Self {
305 list_prefix: list_prefix.to_string(),
306 glob_pattern: Some(source_path.to_string()),
307 matcher: Some(matcher),
308 })
309 }
310
311 fn list_prefix(&self) -> &str {
312 self.list_prefix.as_str()
313 }
314
315 fn matches_key(&self, key: &str) -> bool {
316 match &self.matcher {
317 Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
318 None => true,
319 }
320 }
321
322 fn match_description(&self) -> String {
323 match &self.glob_pattern {
324 Some(pattern) => format!("glob={pattern}"),
325 None => "glob=<none>".to_string(),
326 }
327 }
328}
329
330fn filter_cloud_list_refs(
331 list_refs: Vec<io::storage::planner::ObjectRef>,
332 source_match: &CloudSourceMatch,
333 suffixes: &[String],
334) -> Vec<io::storage::planner::ObjectRef> {
335 let filtered = list_refs
336 .into_iter()
337 .filter(|obj| source_match.matches_key(&obj.key))
338 .collect::<Vec<_>>();
339 let filtered = planner::filter_by_suffixes(filtered, suffixes);
340 planner::stable_sort_refs(filtered)
341}
342
343fn contains_glob_metachar(value: &str) -> bool {
344 first_glob_metachar_index(value).is_some()
345}
346
347fn prefix_before_first_glob(value: &str) -> &str {
348 match first_glob_metachar_index(value) {
349 Some(index) => &value[..index],
350 None => value,
351 }
352}
353
354fn first_glob_metachar_index(value: &str) -> Option<usize> {
355 let mut escaped = false;
356 for (idx, ch) in value.char_indices() {
357 if escaped {
358 escaped = false;
359 continue;
360 }
361 if ch == '\\' {
362 escaped = true;
363 continue;
364 }
365 if matches!(ch, '*' | '?') {
366 return Some(idx);
367 }
368 }
369 None
370}
371
372fn cloud_glob_match_options() -> MatchOptions {
373 MatchOptions {
374 case_sensitive: true,
375 require_literal_separator: true,
376 require_literal_leading_dot: false,
377 }
378}