Skip to main content

floe_core/io/storage/ops/
inputs.rs

1use std::path::{Path, PathBuf};
2
3use crate::io::storage::{planner, Target};
4use crate::{config, io, report, FloeResult};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum ResolveInputsMode {
8    Download,
9    ListOnly,
10}
11
12#[derive(Debug, Clone)]
13pub struct ResolvedInputs {
14    pub files: Vec<io::format::InputFile>,
15    pub listed: Vec<String>,
16    pub mode: report::ResolvedInputMode,
17}
18
19pub fn resolve_inputs(
20    config_dir: &Path,
21    entity: &config::EntityConfig,
22    adapter: &dyn io::format::InputAdapter,
23    target: &Target,
24    resolution_mode: ResolveInputsMode,
25    temp_dir: Option<&Path>,
26    storage_client: Option<&dyn crate::io::storage::StorageClient>,
27) -> FloeResult<ResolvedInputs> {
28    // Storage-specific resolution: list + download for cloud, direct paths for local.
29    match target {
30        Target::S3 { storage, .. } => {
31            let client = require_storage_client(storage_client, "s3")?;
32            let (bucket, key) = target.s3_parts().ok_or_else(|| {
33                Box::new(crate::errors::RunError(
34                    "s3 target missing bucket".to_string(),
35                ))
36            })?;
37            let location = format!("bucket={}", bucket);
38            resolve_cloud_inputs_for_prefix(
39                client,
40                key,
41                adapter,
42                entity,
43                storage,
44                &location,
45                resolution_mode,
46                temp_dir,
47            )
48        }
49        Target::Gcs { storage, .. } => {
50            let client = require_storage_client(storage_client, "gcs")?;
51            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
52                Box::new(crate::errors::RunError(
53                    "gcs target missing bucket".to_string(),
54                ))
55            })?;
56            let location = format!("bucket={}", bucket);
57            resolve_cloud_inputs_for_prefix(
58                client,
59                key,
60                adapter,
61                entity,
62                storage,
63                &location,
64                resolution_mode,
65                temp_dir,
66            )
67        }
68        Target::Adls { storage, .. } => {
69            let client = require_storage_client(storage_client, "adls")?;
70            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
71                Box::new(crate::errors::RunError(
72                    "adls target missing container".to_string(),
73                ))
74            })?;
75            let location = format!("container={}, account={}", container, account);
76            resolve_cloud_inputs_for_prefix(
77                client,
78                base_path,
79                adapter,
80                entity,
81                storage,
82                &location,
83                resolution_mode,
84                temp_dir,
85            )
86        }
87        Target::Local { storage, .. } => {
88            let resolved =
89                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
90            let listed = build_local_listing(&resolved.files, storage_client);
91            let files = match resolution_mode {
92                ResolveInputsMode::Download => {
93                    build_local_inputs(&resolved.files, entity, storage_client)
94                }
95                ResolveInputsMode::ListOnly => Vec::new(),
96            };
97            let mode = match resolved.mode {
98                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
99                io::storage::local::LocalInputMode::Directory => {
100                    report::ResolvedInputMode::Directory
101                }
102            };
103            Ok(ResolvedInputs {
104                files,
105                listed,
106                mode,
107            })
108        }
109    }
110}
111
112fn resolve_cloud_inputs_for_prefix(
113    client: &dyn crate::io::storage::StorageClient,
114    prefix: &str,
115    adapter: &dyn io::format::InputAdapter,
116    entity: &config::EntityConfig,
117    storage: &str,
118    location: &str,
119    resolution_mode: ResolveInputsMode,
120    temp_dir: Option<&Path>,
121) -> FloeResult<ResolvedInputs> {
122    let objects = list_cloud_objects(client, prefix, adapter, entity, storage, location)?;
123    let listed = objects.iter().map(|obj| obj.uri.clone()).collect();
124    let files = match resolution_mode {
125        ResolveInputsMode::Download => {
126            let temp_dir = require_temp_dir(temp_dir, storage)?;
127            build_cloud_inputs(client, &objects, temp_dir, entity)?
128        }
129        ResolveInputsMode::ListOnly => Vec::new(),
130    };
131    Ok(ResolvedInputs {
132        files,
133        listed,
134        mode: report::ResolvedInputMode::Directory,
135    })
136}
137
138fn require_temp_dir<'a>(temp_dir: Option<&'a Path>, label: &str) -> FloeResult<&'a Path> {
139    temp_dir.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
140        Box::new(crate::errors::RunError(format!(
141            "{} tempdir missing",
142            label
143        )))
144    })
145}
146
147fn require_storage_client<'a>(
148    storage_client: Option<&'a dyn crate::io::storage::StorageClient>,
149    label: &str,
150) -> FloeResult<&'a dyn crate::io::storage::StorageClient> {
151    storage_client.ok_or_else(|| -> Box<dyn std::error::Error + Send + Sync> {
152        Box::new(crate::errors::RunError(format!(
153            "{} storage client missing",
154            label
155        )))
156    })
157}
158
159fn list_cloud_objects(
160    client: &dyn crate::io::storage::StorageClient,
161    prefix: &str,
162    adapter: &dyn io::format::InputAdapter,
163    entity: &config::EntityConfig,
164    storage: &str,
165    location: &str,
166) -> FloeResult<Vec<io::storage::planner::ObjectRef>> {
167    let suffixes = adapter.suffixes()?;
168    let list_refs = client.list(prefix)?;
169    let filtered = planner::filter_by_suffixes(list_refs, &suffixes);
170    let filtered = planner::stable_sort_refs(filtered);
171    if filtered.is_empty() {
172        return Err(Box::new(crate::errors::RunError(format!(
173            "entity.name={} source.storage={} no input objects matched ({}, prefix={}, suffixes={})",
174            entity.name,
175            storage,
176            location,
177            prefix,
178            suffixes.join(",")
179        ))));
180    }
181    Ok(filtered)
182}
183
184fn build_cloud_inputs(
185    client: &dyn crate::io::storage::StorageClient,
186    objects: &[io::storage::planner::ObjectRef],
187    temp_dir: &Path,
188    entity: &config::EntityConfig,
189) -> FloeResult<Vec<io::format::InputFile>> {
190    let mut inputs = Vec::with_capacity(objects.len());
191    for object in objects {
192        let local_path = client.download_to_temp(&object.uri, temp_dir)?;
193        let source_name =
194            planner::file_name_from_key(&object.key).unwrap_or_else(|| entity.name.clone());
195        let source_stem =
196            planner::file_stem_from_name(&source_name).unwrap_or_else(|| entity.name.clone());
197        let source_uri = object.uri.clone();
198        inputs.push(io::format::InputFile {
199            source_uri,
200            source_local_path: local_path,
201            source_name,
202            source_stem,
203        });
204    }
205    Ok(inputs)
206}
207
208fn build_local_inputs(
209    files: &[PathBuf],
210    entity: &config::EntityConfig,
211    storage_client: Option<&dyn crate::io::storage::StorageClient>,
212) -> Vec<io::format::InputFile> {
213    files
214        .iter()
215        .map(|path| {
216            let source_name = path
217                .file_name()
218                .and_then(|name| name.to_str())
219                .unwrap_or(entity.name.as_str())
220                .to_string();
221            let source_stem = Path::new(&source_name)
222                .file_stem()
223                .and_then(|stem| stem.to_str())
224                .unwrap_or(entity.name.as_str())
225                .to_string();
226            let uri = storage_client
227                .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
228                .unwrap_or_else(|| path.display().to_string());
229            io::format::InputFile {
230                source_uri: uri,
231                source_local_path: path.clone(),
232                source_name,
233                source_stem,
234            }
235        })
236        .collect()
237}
238
239fn build_local_listing(
240    files: &[PathBuf],
241    storage_client: Option<&dyn crate::io::storage::StorageClient>,
242) -> Vec<String> {
243    files
244        .iter()
245        .map(|path| {
246            storage_client
247                .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
248                .unwrap_or_else(|| path.display().to_string())
249        })
250        .collect()
251}