Skip to main content

floe_core/io/storage/
inputs.rs

1use std::path::{Path, PathBuf};
2
3use crate::{config, io, report, FloeResult};
4
5use super::Target;
6
7#[derive(Debug, Clone)]
8pub struct ResolvedInputs {
9    pub files: Vec<io::format::InputFile>,
10    pub mode: report::ResolvedInputMode,
11}
12
13pub fn resolve_inputs(
14    config_dir: &Path,
15    entity: &config::EntityConfig,
16    adapter: &dyn io::format::InputAdapter,
17    target: &Target,
18    temp_dir: Option<&Path>,
19    storage_client: Option<&dyn super::StorageClient>,
20) -> FloeResult<ResolvedInputs> {
21    match target {
22        Target::S3 { storage, .. } => {
23            let temp_dir = temp_dir.ok_or_else(|| {
24                Box::new(crate::errors::RunError("s3 tempdir missing".to_string()))
25            })?;
26            let client = storage_client.ok_or_else(|| {
27                Box::new(crate::errors::RunError(
28                    "s3 storage client missing".to_string(),
29                ))
30            })?;
31            let (bucket, key) = target.s3_parts().ok_or_else(|| {
32                Box::new(crate::errors::RunError(
33                    "s3 target missing bucket".to_string(),
34                ))
35            })?;
36            let files = io::storage::s3::build_input_files(
37                client, bucket, key, adapter, temp_dir, entity, storage,
38            )?;
39            Ok(ResolvedInputs {
40                files,
41                mode: report::ResolvedInputMode::Directory,
42            })
43        }
44        Target::Gcs { storage, .. } => {
45            let temp_dir = temp_dir.ok_or_else(|| {
46                Box::new(crate::errors::RunError("gcs tempdir missing".to_string()))
47            })?;
48            let client = storage_client.ok_or_else(|| {
49                Box::new(crate::errors::RunError(
50                    "gcs storage client missing".to_string(),
51                ))
52            })?;
53            let (bucket, key) = target.gcs_parts().ok_or_else(|| {
54                Box::new(crate::errors::RunError(
55                    "gcs target missing bucket".to_string(),
56                ))
57            })?;
58            let files = io::storage::gcs::build_input_files(
59                client, bucket, key, adapter, temp_dir, entity, storage,
60            )?;
61            Ok(ResolvedInputs {
62                files,
63                mode: report::ResolvedInputMode::Directory,
64            })
65        }
66        Target::Adls { storage, .. } => {
67            let temp_dir = temp_dir.ok_or_else(|| {
68                Box::new(crate::errors::RunError("adls tempdir missing".to_string()))
69            })?;
70            let client = storage_client.ok_or_else(|| {
71                Box::new(crate::errors::RunError(
72                    "adls storage client missing".to_string(),
73                ))
74            })?;
75            let (container, account, base_path) = target.adls_parts().ok_or_else(|| {
76                Box::new(crate::errors::RunError(
77                    "adls target missing container".to_string(),
78                ))
79            })?;
80            let files = io::storage::adls::build_input_files(
81                client, container, account, base_path, adapter, temp_dir, entity, storage,
82            )?;
83            Ok(ResolvedInputs {
84                files,
85                mode: report::ResolvedInputMode::Directory,
86            })
87        }
88        Target::Local { storage, .. } => {
89            let resolved =
90                adapter.resolve_local_inputs(config_dir, &entity.name, &entity.source, storage)?;
91            let files = build_local_inputs(&resolved.files, entity, storage_client);
92            let mode = match resolved.mode {
93                io::storage::local::LocalInputMode::File => report::ResolvedInputMode::File,
94                io::storage::local::LocalInputMode::Directory => {
95                    report::ResolvedInputMode::Directory
96                }
97            };
98            Ok(ResolvedInputs { files, mode })
99        }
100    }
101}
102
103fn build_local_inputs(
104    files: &[PathBuf],
105    entity: &config::EntityConfig,
106    storage_client: Option<&dyn super::StorageClient>,
107) -> Vec<io::format::InputFile> {
108    files
109        .iter()
110        .map(|path| {
111            let source_name = path
112                .file_name()
113                .and_then(|name| name.to_str())
114                .unwrap_or(entity.name.as_str())
115                .to_string();
116            let source_stem = Path::new(&source_name)
117                .file_stem()
118                .and_then(|stem| stem.to_str())
119                .unwrap_or(entity.name.as_str())
120                .to_string();
121            let uri = storage_client
122                .and_then(|client| client.resolve_uri(&path.display().to_string()).ok())
123                .unwrap_or_else(|| path.display().to_string());
124            io::format::InputFile {
125                source_uri: uri,
126                source_local_path: path.clone(),
127                source_name,
128                source_stem,
129            }
130        })
131        .collect()
132}