floe_core/io/storage/
inputs.rs1use std::path::{Path, PathBuf};
2
3use crate::{config, io, report, FloeResult};
4
5use super::Target;
6
7#[derive(Debug, Clone)]
8pub struct ResolvedInputs {
9 pub files: Vec<io::format::InputFile>,
10 pub mode: report::ResolvedInputMode,
11}
12
13pub fn resolve_inputs(
14 config_dir: &Path,
15 entity: &config::EntityConfig,
16 adapter: &dyn io::format::InputAdapter,
17 target: &Target,
18 temp_dir: Option<&Path>,
19 storage_client: Option<&dyn super::StorageClient>,
20) -> FloeResult<ResolvedInputs> {
21 match target {
23 Target::S3 { storage, .. } => {
24 let temp_dir = temp_dir.ok_or_else(|| {
25 Box::new(crate::errors::RunError("s3 tempdir missing".to_string()))
26 })?;
27 let client = storage_client.ok_or_else(|| {
28 Box::new(crate::errors::RunError(
29 "s3 storage client missing".to_string(),
30 ))
31 })?;
32 let (bucket, key) = target.s3_parts().ok_or_else(|| {
33 Box::new(crate::errors::RunError(
34 "s3 target missing bucket".to_string(),
35 ))
36 })?;
37 let files = io::storage::s3::build_input_files(
38 client, bucket, key, adapter, temp_dir, entity, storage,
39 )?;
40 Ok(ResolvedInputs {
41 files,
42 mode: report::ResolvedInputMode::Directory,
43 })
44 }
45 Target::Gcs { storage, .. } => {
46 let temp_dir = temp_dir.ok_or_else(|| {
47 Box::new(crate::errors::RunError("gcs tempdir missing".to_string()))
48 })?;
49 let client = storage_client.ok_or_else(|| {
50 Box::new(crate::errors::RunError(
51 "gcs storage client missing".to_string(),
52 ))
53 })?;
54 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
55 Box::new(crate::errors::RunError(
56 "gcs target missing bucket".to_string(),
57 ))
58 })?;
59 let files = io::storage::gcs::build_input_files(
60 client, bucket, key, adapter, temp_dir, entity, storage,
61 )?;
62 Ok(ResolvedInputs {
63 files,
64 mode: report::ResolvedInputMode::Directory,
65 })
66 }
67 Target::Adls { storage, .. } => {
68 let temp_dir = temp_dir.ok_or_else(|| {
69 Box::new(crate::errors::RunError("adls tempdir missing".to_string()))
70 })?;
71 let client = storage_client.ok_or_else(|| {
72 Box::new(crate::errors::RunError(
73 "adls storage client missing".to_string(),
74 ))
75 })?;
76 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
77 Box::new(crate::errors::RunError(
78 "adls target missing container".to_string(),
79 ))
80 })?;
81 let files = io::storage::adls::build_input_files(
82 client, container, account, base_path, adapter, temp_dir, entity, storage,
83 )?;
84 Ok(ResolvedInputs {
85 files,
86 mode: report::ResolvedInputMode::Directory,
87 })
88 }
89 Target::Local { storage, .. } => {
90 let resolved =
91 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
92 let files = build_local_inputs(&resolved.files, entity, storage_client);
93 let mode = match resolved.mode {
94 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
95 io::storage::local::LocalInputMode::Directory => {
96 report::ResolvedInputMode::Directory
97 }
98 };
99 Ok(ResolvedInputs { files, mode })
100 }
101 }
102}
103
104fn build_local_inputs(
105 files: &[PathBuf],
106 entity: &config::EntityConfig,
107 storage_client: Option<&dyn super::StorageClient>,
108) -> Vec<io::format::InputFile> {
109 files
110 .iter()
111 .map(|path| {
112 let source_name = path
113 .file_name()
114 .and_then(|name| name.to_str())
115 .unwrap_or(entity.name.as_str())
116 .to_string();
117 let source_stem = Path::new(&source_name)
118 .file_stem()
119 .and_then(|stem| stem.to_str())
120 .unwrap_or(entity.name.as_str())
121 .to_string();
122 let uri = storage_client
123 .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
124 .unwrap_or_else(|| path.display().to_string());
125 io::format::InputFile {
126 source_uri: uri,
127 source_local_path: path.clone(),
128 source_name,
129 source_stem,
130 }
131 })
132 .collect()
133}