floe_core/io/storage/
inputs.rs1use std::path::{Path, PathBuf};
2
3use crate::{config, io, report, FloeResult};
4
5use super::Target;
6
7#[derive(Debug, Clone)]
8pub struct ResolvedInputs {
9 pub files: Vec<io::format::InputFile>,
10 pub mode: report::ResolvedInputMode,
11}
12
13pub fn resolve_inputs(
14 config_dir: &Path,
15 entity: &config::EntityConfig,
16 adapter: &dyn io::format::InputAdapter,
17 target: &Target,
18 temp_dir: Option<&Path>,
19 storage_client: Option<&dyn super::StorageClient>,
20) -> FloeResult<ResolvedInputs> {
21 match target {
22 Target::S3 { storage, .. } => {
23 let temp_dir = temp_dir.ok_or_else(|| {
24 Box::new(crate::errors::RunError("s3 tempdir missing".to_string()))
25 })?;
26 let client = storage_client.ok_or_else(|| {
27 Box::new(crate::errors::RunError(
28 "s3 storage client missing".to_string(),
29 ))
30 })?;
31 let (bucket, key) = target.s3_parts().ok_or_else(|| {
32 Box::new(crate::errors::RunError(
33 "s3 target missing bucket".to_string(),
34 ))
35 })?;
36 let files = io::storage::s3::build_input_files(
37 client, bucket, key, adapter, temp_dir, entity, storage,
38 )?;
39 Ok(ResolvedInputs {
40 files,
41 mode: report::ResolvedInputMode::Directory,
42 })
43 }
44 Target::Gcs { storage, .. } => {
45 let temp_dir = temp_dir.ok_or_else(|| {
46 Box::new(crate::errors::RunError("gcs tempdir missing".to_string()))
47 })?;
48 let client = storage_client.ok_or_else(|| {
49 Box::new(crate::errors::RunError(
50 "gcs storage client missing".to_string(),
51 ))
52 })?;
53 let (bucket, key) = target.gcs_parts().ok_or_else(|| {
54 Box::new(crate::errors::RunError(
55 "gcs target missing bucket".to_string(),
56 ))
57 })?;
58 let files = io::storage::gcs::build_input_files(
59 client, bucket, key, adapter, temp_dir, entity, storage,
60 )?;
61 Ok(ResolvedInputs {
62 files,
63 mode: report::ResolvedInputMode::Directory,
64 })
65 }
66 Target::Adls { storage, .. } => {
67 let temp_dir = temp_dir.ok_or_else(|| {
68 Box::new(crate::errors::RunError("adls tempdir missing".to_string()))
69 })?;
70 let client = storage_client.ok_or_else(|| {
71 Box::new(crate::errors::RunError(
72 "adls storage client missing".to_string(),
73 ))
74 })?;
75 let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
76 Box::new(crate::errors::RunError(
77 "adls target missing container".to_string(),
78 ))
79 })?;
80 let files = io::storage::adls::build_input_files(
81 client, container, account, base_path, adapter, temp_dir, entity, storage,
82 )?;
83 Ok(ResolvedInputs {
84 files,
85 mode: report::ResolvedInputMode::Directory,
86 })
87 }
88 Target::Local { storage, .. } => {
89 let resolved =
90 adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
91 let files = build_local_inputs(&resolved.files, entity, storage_client);
92 let mode = match resolved.mode {
93 io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
94 io::storage::local::LocalInputMode::Directory => {
95 report::ResolvedInputMode::Directory
96 }
97 };
98 Ok(ResolvedInputs { files, mode })
99 }
100 }
101}
102
103fn build_local_inputs(
104 files: &[PathBuf],
105 entity: &config::EntityConfig,
106 storage_client: Option<&dyn super::StorageClient>,
107) -> Vec<io::format::InputFile> {
108 files
109 .iter()
110 .map(|path| {
111 let source_name = path
112 .file_name()
113 .and_then(|name| name.to_str())
114 .unwrap_or(entity.name.as_str())
115 .to_string();
116 let source_stem = Path::new(&source_name)
117 .file_stem()
118 .and_then(|stem| stem.to_str())
119 .unwrap_or(entity.name.as_str())
120 .to_string();
121 let uri = storage_client
122 .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
123 .unwrap_or_else(|| path.display().to_string());
124 io::format::InputFile {
125 source_uri: uri,
126 source_local_path: path.clone(),
127 source_name,
128 source_stem,
129 }
130 })
131 .collect()
132}