floe_core/io/storage/ops/
inputs.rs1use std::path::{Path, PathBuf};
2
3use crate::io::storage::{planner, Target};
4use crate::{config, io, report, FloeResult};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum ResolveInputsMode {
8 Download,
9 ListOnly,
10}
11
12#[derive(Debug, Clone)]
13pub struct ResolvedInputs {
14 pub files: Vec<io::format::InputFile>,
15 pub listed: Vec<String>,
16 pub mode: report::ResolvedInputMode,
17}
18
19pub fn resolve_inputs(
20 config_dir: &Path,
21 entity: &config::EntityConfig,
22 adapter: &dyn io::format::InputAdapter,
23 target: &Target,
24 resolution_mode: ResolveInputsMode,
25 temp_dir: Option<&Path>,
26 storage_client: Option<&dyn crate::io::storage::StorageClient>,
27) -> FloeResult<ResolvedInputs> {
28 match target {
30 Target::S3 { storage, .. } => {
31 let client = require_storage_client(storage_client, "s3")?;
32 let (bucket, key) = target.s3_parts().ok_or_else(|| {
33 Box::new(crate::errors::RunError(
34 "s3 target missing bucket".to_string(),
35 ))
36 })?;
37 let location = format!("bucket={}", bucket);
38 resolve_cloud_inputs_for_prefix(
39 client,
40 key,
41 adapter,
42 entity,
43 storage,
44 &location,
45 resolution_mode,
46 temp_dir,
47 )
48 }
49 Target::Gcs { storage, .. } => {
50 let client = require_storage_client(storage_client, "gcs")?;
51 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
52 Box::new(crate::errors::RunError(
53 "gcs target missing bucket".to_string(),
54 ))
55 })?;
56 let location = format!("bucket={}", bucket);
57 resolve_cloud_inputs_for_prefix(
58 client,
59 key,
60 adapter,
61 entity,
62 storage,
63 &location,
64 resolution_mode,
65 temp_dir,
66 )
67 }
68 Target::Adls { storage, .. } => {
69 let client = require_storage_client(storage_client, "adls")?;
70 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
71 Box::new(crate::errors::RunError(
72 "adls target missing container".to_string(),
73 ))
74 })?;
75 let location = format!("container={}, account={}", container, account);
76 resolve_cloud_inputs_for_prefix(
77 client,
78 base_path,
79 adapter,
80 entity,
81 storage,
82 &location,
83 resolution_mode,
84 temp_dir,
85 )
86 }
87 Target::Local { storage, .. } => {
88 let resolved =
89 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
90 let listed = build_local_listing(&resolved.files, storage_client);
91 let files = match resolution_mode {
92 ResolveInputsMode::Download => {
93 build_local_inputs(&resolved.files, entity, storage_client)
94 }
95 ResolveInputsMode::ListOnly => Vec::new(),
96 };
97 let mode = match resolved.mode {
98 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
99 io::storage::local::LocalInputMode::Directory => {
100 report::ResolvedInputMode::Directory
101 }
102 };
103 Ok(ResolvedInputs {
104 files,
105 listed,
106 mode,
107 })
108 }
109 }
110}
111
112fn resolve_cloud_inputs_for_prefix(
113 client: &dyn crate::io::storage::StorageClient,
114 prefix: &str,
115 adapter: &dyn io::format::InputAdapter,
116 entity: &config::EntityConfig,
117 storage: &str,
118 location: &str,
119 resolution_mode: ResolveInputsMode,
120 temp_dir: Option<&Path>,
121) -> FloeResult<ResolvedInputs> {
122 let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
123 let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
124 let files = match resolution_mode {
125 ResolveInputsMode::Download => {
126 let temp_dir = require_temp_dir(temp_dir, storage)?;
127 build_cloud_inputs(client, &objects, temp_dir, entity)?
128 }
129 ResolveInputsMode::ListOnly => Vec::new(),
130 };
131 Ok(ResolvedInputs {
132 files,
133 listed,
134 mode: report::ResolvedInputMode::Directory,
135 })
136}
137
138fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
139 temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
140 Box::new(crate::errors::RunError(format!(
141 "{} tempdir missing",
142 label
143 )))
144 })
145}
146
147fn require_storage_client<'a>(
148 storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
149 label: &str,
150) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
151 storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
152 Box::new(crate::errors::RunError(format!(
153 "{} storage client missing",
154 label
155 )))
156 })
157}
158
159fn list_cloud_objects(
160 client: &dyn crate::io::storage::StorageClient,
161 prefix: &str,
162 adapter: &dyn io::format::InputAdapter,
163 entity: &config::EntityConfig,
164 storage: &str,
165 location: &str,
166) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
167 let suffixes = adapter.suffixes()?;
168 let list_refs = client.list(prefix)?;
169 let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
170 let filtered = planner::stable_sort_refs(filtered);
171 if filtered.is_empty() {
172 return Err(Box::new(crate::errors::RunError(format!(
173 "entity.name={} source.storage={} no input objects matched ({}, prefix={}, suffixes={})",
174 entity.name,
175 storage,
176 location,
177 prefix,
178 suffixes.join(",")
179 ))));
180 }
181 Ok(filtered)
182}
183
184fn build_cloud_inputs(
185 client: &dyn crate::io::storage::StorageClient,
186 objects: &[io::storage::planner::ObjectRef],
187 temp_dir: &Path,
188 entity: &config::EntityConfig,
189) -> FloeResult<Vec<io::format::InputFile>> {
190 let mut inputs = Vec::with_capacity(objects.len());
191 for object in objects {
192 let local_path = client.download_to_temp(&object.uri, temp_dir)?;
193 let source_name =
194 planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
195 let source_stem =
196 planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
197 let source_uri = object.uri.clone();
198 inputs.push(io::format::InputFile {
199 source_uri,
200 source_local_path: local_path,
201 source_name,
202 source_stem,
203 });
204 }
205 Ok(inputs)
206}
207
208fn build_local_inputs(
209 files: &[PathBuf],
210 entity: &config::EntityConfig,
211 storage_client: Option<&dyn crate::io::storage::StorageClient>,
212) -> Vec<io::format::InputFile> {
213 files
214 .iter()
215 .map(|path| {
216 let source_name = path
217 .file_name()
218 .and_then(|name| name.to_str())
219 .unwrap_or(entity.name.as_str())
220 .to_string();
221 let source_stem = Path::new(&source_name)
222 .file_stem()
223 .and_then(|stem| stem.to_str())
224 .unwrap_or(entity.name.as_str())
225 .to_string();
226 let uri = storage_client
227 .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
228 .unwrap_or_else(|| path.display().to_string());
229 io::format::InputFile {
230 source_uri: uri,
231 source_local_path: path.clone(),
232 source_name,
233 source_stem,
234 }
235 })
236 .collect()
237}
238
239fn build_local_listing(
240 files: &[PathBuf],
241 storage_client: Option<&dyn crate::io::storage::StorageClient>,
242) -> Vec<String> {
243 files
244 .iter()
245 .map(|path| {
246 storage_client
247 .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
248 .unwrap_or_else(|| path.display().to_string())
249 })
250 .collect()
251}