1use std::path::{Path, PathBuf};
2
3use glob::{MatchOptions, Pattern};
4
5use crate::io::storage::{planner, Target};
6use crate::{config, io, report, FloeResult};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ResolveInputsMode {
10 Download,
11 ListOnly,
12}
13
14#[derive(Debug, Clone)]
15pub struct ResolvedInputs {
16 pub files: Vec<io::format::InputFile>,
17 pub listed: Vec<String>,
18 pub mode: report::ResolvedInputMode,
19}
20
21pub fn resolve_inputs(
22 config_dir: &Path,
23 entity: &config::EntityConfig,
24 adapter: &dyn io::format::InputAdapter,
25 target: &Target,
26 resolution_mode: ResolveInputsMode,
27 temp_dir: Option<&Path>,
28 storage_client: Option<&dyn crate::io::storage::StorageClient>,
29) -> FloeResult<ResolvedInputs> {
30 match target {
32 Target::S3 { storage, .. } => {
33 let client = require_storage_client(storage_client, "s3")?;
34 let (bucket, key) = target.s3_parts().ok_or_else(|| {
35 Box::new(crate::errors::RunError(
36 "s3 target missing bucket".to_string(),
37 ))
38 })?;
39 let location = format!("bucket={}", bucket);
40 resolve_cloud_inputs_for_prefix(
41 client,
42 key,
43 adapter,
44 entity,
45 storage,
46 &location,
47 resolution_mode,
48 temp_dir,
49 )
50 }
51 Target::Gcs { storage, .. } => {
52 let client = require_storage_client(storage_client, "gcs")?;
53 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
54 Box::new(crate::errors::RunError(
55 "gcs target missing bucket".to_string(),
56 ))
57 })?;
58 let location = format!("bucket={}", bucket);
59 resolve_cloud_inputs_for_prefix(
60 client,
61 key,
62 adapter,
63 entity,
64 storage,
65 &location,
66 resolution_mode,
67 temp_dir,
68 )
69 }
70 Target::Adls { storage, .. } => {
71 let client = require_storage_client(storage_client, "adls")?;
72 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
73 Box::new(crate::errors::RunError(
74 "adls target missing container".to_string(),
75 ))
76 })?;
77 let location = format!("container={}, account={}", container, account);
78 resolve_cloud_inputs_for_prefix(
79 client,
80 base_path,
81 adapter,
82 entity,
83 storage,
84 &location,
85 resolution_mode,
86 temp_dir,
87 )
88 }
89 Target::Local { storage, .. } => {
90 let resolved =
91 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
92 let listed = build_local_listing(&resolved.files, storage_client);
93 let files = match resolution_mode {
94 ResolveInputsMode::Download => {
95 build_local_inputs(&resolved.files, entity, storage_client)
96 }
97 ResolveInputsMode::ListOnly => Vec::new(),
98 };
99 let mode = match resolved.mode {
100 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
101 io::storage::local::LocalInputMode::Directory => {
102 report::ResolvedInputMode::Directory
103 }
104 };
105 Ok(ResolvedInputs {
106 files,
107 listed,
108 mode,
109 })
110 }
111 }
112}
113
114fn resolve_cloud_inputs_for_prefix(
115 client: &dyn crate::io::storage::StorageClient,
116 prefix: &str,
117 adapter: &dyn io::format::InputAdapter,
118 entity: &config::EntityConfig,
119 storage: &str,
120 location: &str,
121 resolution_mode: ResolveInputsMode,
122 temp_dir: Option<&Path>,
123) -> FloeResult<ResolvedInputs> {
124 let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
125 let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
126 let files = match resolution_mode {
127 ResolveInputsMode::Download => {
128 let temp_dir = require_temp_dir(temp_dir, storage)?;
129 build_cloud_inputs(client, &objects, temp_dir, entity)?
130 }
131 ResolveInputsMode::ListOnly => Vec::new(),
132 };
133 Ok(ResolvedInputs {
134 files,
135 listed,
136 mode: report::ResolvedInputMode::Directory,
137 })
138}
139
140fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
141 temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
142 Box::new(crate::errors::RunError(format!(
143 "{} tempdir missing",
144 label
145 )))
146 })
147}
148
149fn require_storage_client<'a>(
150 storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
151 label: &str,
152) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
153 storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
154 Box::new(crate::errors::RunError(format!(
155 "{} storage client missing",
156 label
157 )))
158 })
159}
160
161fn list_cloud_objects(
162 client: &dyn crate::io::storage::StorageClient,
163 source_path: &str,
164 adapter: &dyn io::format::InputAdapter,
165 entity: &config::EntityConfig,
166 storage: &str,
167 location: &str,
168) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
169 let source_match = CloudSourceMatch::new(source_path).map_err(|err| {
170 Box::new(crate::errors::RunError(format!(
171 "entity.name={} source.storage={} invalid cloud source path ({}, path={}): {}",
172 entity.name, storage, location, source_path, err
173 ))) as Box<dyn std::error::Error + Send + Sync>
174 })?;
175 let suffixes = adapter.suffixes()?;
176 let list_refs = client.list(source_match.list_prefix())?;
177 let filtered = filter_cloud_list_refs(list_refs, &source_match, &suffixes);
178 if filtered.is_empty() {
179 let match_desc = source_match.match_description();
180 return Err(Box::new(crate::errors::RunError(format!(
181 "entity.name={} source.storage={} no input objects matched ({}, prefix={}, {}, suffixes={})",
182 entity.name,
183 storage,
184 location,
185 source_match.list_prefix(),
186 match_desc,
187 suffixes.join(",")
188 ))));
189 }
190 Ok(filtered)
191}
192
193fn build_cloud_inputs(
194 client: &dyn crate::io::storage::StorageClient,
195 objects: &[io::storage::planner::ObjectRef],
196 temp_dir: &Path,
197 entity: &config::EntityConfig,
198) -> FloeResult<Vec<io::format::InputFile>> {
199 let mut inputs = Vec::with_capacity(objects.len());
200 for object in objects {
201 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
202 let source_name =
203 planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
204 let source_stem =
205 planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
206 let source_uri = object.uri.clone();
207 inputs.push(io::format::InputFile {
208 source_uri,
209 source_local_path: local_path,
210 source_name,
211 source_stem,
212 });
213 }
214 Ok(inputs)
215}
216
217fn build_local_inputs(
218 files: &[PathBuf],
219 entity: &config::EntityConfig,
220 storage_client: Option<&dyn crate::io::storage::StorageClient>,
221) -> Vec<io::format::InputFile> {
222 files
223 .iter()
224 .map(|path| {
225 let source_name = path
226 .file_name()
227 .and_then(|name| name.to_str())
228 .unwrap_or(entity.name.as_str())
229 .to_string();
230 let source_stem = Path::new(&source_name)
231 .file_stem()
232 .and_then(|stem| stem.to_str())
233 .unwrap_or(entity.name.as_str())
234 .to_string();
235 let uri = storage_client
236 .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
237 .unwrap_or_else(|| path.display().to_string());
238 io::format::InputFile {
239 source_uri: uri,
240 source_local_path: path.clone(),
241 source_name,
242 source_stem,
243 }
244 })
245 .collect()
246}
247
248fn build_local_listing(
249 files: &[PathBuf],
250 storage_client: Option<&dyn crate::io::storage::StorageClient>,
251) -> Vec<String> {
252 files
253 .iter()
254 .map(|path| {
255 storage_client
256 .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
257 .unwrap_or_else(|| path.display().to_string())
258 })
259 .collect()
260}
261
262#[derive(Debug)]
263struct CloudSourceMatch {
264 list_prefix: String,
265 glob_pattern: Option<String>,
266 matcher: Option<Pattern>,
267}
268
269impl CloudSourceMatch {
270 fn new(source_path: &str) -> FloeResult<Self> {
271 if !contains_glob_metachar(source_path) {
272 return Ok(Self {
273 list_prefix: source_path.to_string(),
274 glob_pattern: None,
275 matcher: None,
276 });
277 }
278
279 let list_prefix = prefix_before_first_glob(source_path);
280 if list_prefix.trim_matches('/').is_empty() {
281 return Err(Box::new(crate::errors::RunError(
282 "glob patterns for cloud sources must include a non-empty literal prefix before the first wildcard"
283 .to_string(),
284 )));
285 }
286
287 let matcher = Pattern::new(source_path).map_err(|err| {
288 Box::new(crate::errors::RunError(format!(
289 "invalid cloud glob pattern {:?}: {err}",
290 source_path
291 ))) as Box<dyn std::error::Error + Send + Sync>
292 })?;
293
294 Ok(Self {
295 list_prefix: list_prefix.to_string(),
296 glob_pattern: Some(source_path.to_string()),
297 matcher: Some(matcher),
298 })
299 }
300
301 fn list_prefix(&self) -> &str {
302 self.list_prefix.as_str()
303 }
304
305 fn matches_key(&self, key: &str) -> bool {
306 match &self.matcher {
307 Some(matcher) => matcher.matches_with(key, cloud_glob_match_options()),
308 None => true,
309 }
310 }
311
312 fn match_description(&self) -> String {
313 match &self.glob_pattern {
314 Some(pattern) => format!("glob={pattern}"),
315 None => "glob=<none>".to_string(),
316 }
317 }
318}
319
320fn filter_cloud_list_refs(
321 list_refs: Vec<io::storage::planner::ObjectRef>,
322 source_match: &CloudSourceMatch,
323 suffixes: &[String],
324) -> Vec<io::storage::planner::ObjectRef> {
325 let filtered = list_refs
326 .into_iter()
327 .filter(|obj| source_match.matches_key(&obj.key))
328 .collect::<Vec<_>>();
329 let filtered = planner::filter_by_suffixes(filtered, suffixes);
330 planner::stable_sort_refs(filtered)
331}
332
333fn contains_glob_metachar(value: &str) -> bool {
334 first_glob_metachar_index(value).is_some()
335}
336
337fn prefix_before_first_glob(value: &str) -> &str {
338 match first_glob_metachar_index(value) {
339 Some(index) => &value[..index],
340 None => value,
341 }
342}
343
344fn first_glob_metachar_index(value: &str) -> Option<usize> {
345 let mut escaped = false;
346 for (idx, ch) in value.char_indices() {
347 if escaped {
348 escaped = false;
349 continue;
350 }
351 if ch == '\\' {
352 escaped = true;
353 continue;
354 }
355 if matches!(ch, '*' | '?') {
356 return Some(idx);
357 }
358 }
359 None
360}
361
362fn cloud_glob_match_options() -> MatchOptions {
363 MatchOptions {
364 case_sensitive: true,
365 require_literal_separator: true,
366 require_literal_leading_dot: false,
367 }
368}