Skip to main content

floe_core/io/storage/
inputs.rs

1use std::path::{Path, PathBuf};
2
3use crate::{config, io, report, FloeResult};
4
5use super::Target;
6
7#[derive(Debug, Clone)]
8pub struct ResolvedInputs {
9    pub files: Vec<io::format::InputFile>,
10    pub mode: report::ResolvedInputMode,
11}
12
13pub fn resolve_inputs(
14    config_dir: &Path,
15    entity: &config::EntityConfig,
16    adapter: &dyn io::format::InputAdapter,
17    target: &Target,
18    temp_dir: Option<&Path>,
19    storage_client: Option<&dyn super::StorageClient>,
20) -> FloeResult<ResolvedInputs> {
21    // Storage-specific resolution: list + download for cloud, direct paths for local.
22    match target {
23        Target::S3 { storage, .. } => {
24            let temp_dir = temp_dir.ok_or_else(|| {
25                Box::new(crate::errors::RunError("s3 tempdir missing".to_string()))
26            })?;
27            let client = storage_client.ok_or_else(|| {
28                Box::new(crate::errors::RunError(
29                    "s3 storage client missing".to_string(),
30                ))
31            })?;
32            let (bucket, key) = target.s3_parts().ok_or_else(|| {
33                Box::new(crate::errors::RunError(
34                    "s3 target missing bucket".to_string(),
35                ))
36            })?;
37            let files = io::storage::s3::build_input_files(
38                client, bucket, key, adapter, temp_dir, entity, storage,
39            )?;
40            Ok(ResolvedInputs {
41                files,
42                mode: report::ResolvedInputMode::Directory,
43            })
44        }
45        Target::Gcs { storage, .. } => {
46            let temp_dir = temp_dir.ok_or_else(|| {
47                Box::new(crate::errors::RunError("gcs tempdir missing".to_string()))
48            })?;
49            let client = storage_client.ok_or_else(|| {
50                Box::new(crate::errors::RunError(
51                    "gcs storage client missing".to_string(),
52                ))
53            })?;
54            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
55                Box::new(crate::errors::RunError(
56                    "gcs target missing bucket".to_string(),
57                ))
58            })?;
59            let files = io::storage::gcs::build_input_files(
60                client, bucket, key, adapter, temp_dir, entity, storage,
61            )?;
62            Ok(ResolvedInputs {
63                files,
64                mode: report::ResolvedInputMode::Directory,
65            })
66        }
67        Target::Adls { storage, .. } => {
68            let temp_dir = temp_dir.ok_or_else(|| {
69                Box::new(crate::errors::RunError("adls tempdir missing".to_string()))
70            })?;
71            let client = storage_client.ok_or_else(|| {
72                Box::new(crate::errors::RunError(
73                    "adls storage client missing".to_string(),
74                ))
75            })?;
76            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
77                Box::new(crate::errors::RunError(
78                    "adls target missing container".to_string(),
79                ))
80            })?;
81            let files = io::storage::adls::build_input_files(
82                client, container, account, base_path, adapter, temp_dir, entity, storage,
83            )?;
84            Ok(ResolvedInputs {
85                files,
86                mode: report::ResolvedInputMode::Directory,
87            })
88        }
89        Target::Local { storage, .. } => {
90            let resolved =
91                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
92            let files = build_local_inputs(&resolved.files, entity, storage_client);
93            let mode = match resolved.mode {
94                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
95                io::storage::local::LocalInputMode::Directory => {
96                    report::ResolvedInputMode::Directory
97                }
98            };
99            Ok(ResolvedInputs { files, mode })
100        }
101    }
102}
103
104fn build_local_inputs(
105    files: &[PathBuf],
106    entity: &config::EntityConfig,
107    storage_client: Option<&dyn super::StorageClient>,
108) -> Vec<io::format::InputFile> {
109    files
110        .iter()
111        .map(|path| {
112            let source_name = path
113                .file_name()
114                .and_then(|name| name.to_str())
115                .unwrap_or(entity.name.as_str())
116                .to_string();
117            let source_stem = Path::new(&source_name)
118                .file_stem()
119                .and_then(|stem| stem.to_str())
120                .unwrap_or(entity.name.as_str())
121                .to_string();
122            let uri = storage_client
123                .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
124                .unwrap_or_else(|| path.display().to_string());
125            io::format::InputFile {
126                source_uri: uri,
127                source_local_path: path.clone(),
128                source_name,
129                source_stem,
130            }
131        })
132        .collect()
133}