Skip to main content

floe_core/io/storage/
target.rs

1use crate::{config, io, ConfigError, FloeResult};
2
3use super::{paths, OutputPlacement};
4
5#[derive(Debug, Clone)]
6pub enum Target {
7    Local {
8        storage: String,
9        uri: String,
10        base_path: String,
11    },
12    S3 {
13        storage: String,
14        uri: String,
15        bucket: String,
16        base_key: String,
17    },
18    Adls {
19        storage: String,
20        uri: String,
21        account: String,
22        container: String,
23        base_path: String,
24    },
25    Gcs {
26        storage: String,
27        uri: String,
28        bucket: String,
29        base_key: String,
30    },
31}
32
33impl Target {
34    pub fn from_resolved(resolved: &config::ResolvedPath) -> FloeResult<Self> {
35        if let Some(path) = &resolved.local_path {
36            return Ok(Target::Local {
37                storage: resolved.storage.clone(),
38                uri: resolved.uri.clone(),
39                base_path: path.display().to_string(),
40            });
41        }
42        if resolved.uri.starts_with("s3://") {
43            let location = io::storage::s3::parse_s3_uri(&resolved.uri)?;
44            return Ok(Target::S3 {
45                storage: resolved.storage.clone(),
46                uri: resolved.uri.clone(),
47                bucket: location.bucket,
48                base_key: location.key,
49            });
50        }
51        if resolved.uri.starts_with("abfs://") {
52            let location = io::storage::adls::parse_adls_uri(&resolved.uri)?;
53            return Ok(Target::Adls {
54                storage: resolved.storage.clone(),
55                uri: resolved.uri.clone(),
56                account: location.account,
57                container: location.container,
58                base_path: location.path,
59            });
60        }
61        if resolved.uri.starts_with("gs://") {
62            let location = io::storage::gcs::parse_gcs_uri(&resolved.uri)?;
63            return Ok(Target::Gcs {
64                storage: resolved.storage.clone(),
65                uri: resolved.uri.clone(),
66                bucket: location.bucket,
67                base_key: location.key,
68            });
69        }
70        Err(Box::new(ConfigError(format!(
71            "unsupported storage uri: {}",
72            resolved.uri
73        ))))
74    }
75
76    pub fn storage(&self) -> &str {
77        match self {
78            Target::Local { storage, .. }
79            | Target::S3 { storage, .. }
80            | Target::Adls { storage, .. }
81            | Target::Gcs { storage, .. } => storage.as_str(),
82        }
83    }
84
85    pub fn target_uri(&self) -> &str {
86        match self {
87            Target::Local { uri, .. }
88            | Target::S3 { uri, .. }
89            | Target::Adls { uri, .. }
90            | Target::Gcs { uri, .. } => uri.as_str(),
91        }
92    }
93
94    pub fn is_remote(&self) -> bool {
95        !matches!(self, Target::Local { .. })
96    }
97
98    pub fn join_relative(&self, relative: &str) -> String {
99        match self {
100            Target::Local { base_path, .. } => paths::resolve_output_dir_path(base_path, relative)
101                .display()
102                .to_string(),
103            Target::S3 {
104                bucket, base_key, ..
105            } => {
106                let key = paths::resolve_output_dir_key(base_key, relative);
107                io::storage::s3::format_s3_uri(bucket, &key)
108            }
109            Target::Adls {
110                account,
111                container,
112                base_path,
113                ..
114            } => {
115                let key = paths::resolve_output_dir_key(base_path, relative);
116                io::storage::adls::format_abfs_uri(container, account, &key)
117            }
118            Target::Gcs {
119                bucket, base_key, ..
120            } => {
121                let key = paths::resolve_output_dir_key(base_key, relative);
122                io::storage::gcs::format_gcs_uri(bucket, &key)
123            }
124        }
125    }
126
127    pub fn resolve_output_uri(&self, placement: OutputPlacement, filename: &str) -> String {
128        match self {
129            Target::Local { base_path, .. } => {
130                let output_path = match placement {
131                    OutputPlacement::Output => paths::resolve_output_path(base_path, filename),
132                    OutputPlacement::Directory => {
133                        paths::resolve_output_dir_path(base_path, filename)
134                    }
135                    OutputPlacement::Sibling => paths::resolve_sibling_path(base_path, filename),
136                };
137                output_path.display().to_string()
138            }
139            Target::S3 {
140                bucket, base_key, ..
141            } => {
142                let key = match placement {
143                    OutputPlacement::Output => paths::resolve_output_key(base_key, filename),
144                    OutputPlacement::Directory => paths::resolve_output_dir_key(base_key, filename),
145                    OutputPlacement::Sibling => paths::resolve_sibling_key(base_key, filename),
146                };
147                io::storage::s3::format_s3_uri(bucket, &key)
148            }
149            Target::Adls {
150                account,
151                container,
152                base_path,
153                ..
154            } => {
155                let key = match placement {
156                    OutputPlacement::Output => paths::resolve_output_key(base_path, filename),
157                    OutputPlacement::Directory => {
158                        paths::resolve_output_dir_key(base_path, filename)
159                    }
160                    OutputPlacement::Sibling => paths::resolve_sibling_key(base_path, filename),
161                };
162                io::storage::adls::format_abfs_uri(container, account, &key)
163            }
164            Target::Gcs {
165                bucket, base_key, ..
166            } => {
167                let key = match placement {
168                    OutputPlacement::Output => paths::resolve_output_key(base_key, filename),
169                    OutputPlacement::Directory => paths::resolve_output_dir_key(base_key, filename),
170                    OutputPlacement::Sibling => paths::resolve_sibling_key(base_key, filename),
171                };
172                io::storage::gcs::format_gcs_uri(bucket, &key)
173            }
174        }
175    }
176
177    pub fn s3_parts(&self) -> Option<(&str, &str)> {
178        match self {
179            Target::S3 {
180                bucket, base_key, ..
181            } => Some((bucket.as_str(), base_key.as_str())),
182            Target::Local { .. } => None,
183            Target::Adls { .. } | Target::Gcs { .. } => None,
184        }
185    }
186
187    pub fn gcs_parts(&self) -> Option<(&str, &str)> {
188        match self {
189            Target::Gcs {
190                bucket, base_key, ..
191            } => Some((bucket.as_str(), base_key.as_str())),
192            Target::Local { .. } | Target::S3 { .. } | Target::Adls { .. } => None,
193        }
194    }
195
196    pub fn adls_parts(&self) -> Option<(&str, &str, &str)> {
197        match self {
198            Target::Adls {
199                container,
200                account,
201                base_path,
202                ..
203            } => Some((container.as_str(), account.as_str(), base_path.as_str())),
204            Target::Local { .. } | Target::S3 { .. } | Target::Gcs { .. } => None,
205        }
206    }
207}