1use paste::paste;
21use std::collections::HashMap;
22use std::env;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use crate::config::table::HudiTableConfig;
27use crate::config::util::{parse_data_for_options, split_hudi_options_from_others};
28use crate::config::{HudiConfigs, HUDI_CONF_DIR};
29use crate::storage::Storage;
30use crate::table::fs_view::FileSystemView;
31use crate::table::validation::validate_configs;
32use crate::table::Table;
33use crate::timeline::Timeline;
34use crate::util::collection::extend_if_absent;
35use crate::Result;
36
37#[derive(Debug, Clone)]
39pub struct TableBuilder {
40 option_resolver: OptionResolver,
41}
42
43#[derive(Debug, Clone)]
45pub struct OptionResolver {
46 pub base_uri: String,
47 pub hudi_options: HashMap<String, String>,
48 pub storage_options: HashMap<String, String>,
49 pub options: HashMap<String, String>,
50}
51
52macro_rules! impl_with_options {
53 ($struct_name:ident, $($field:ident, $singular:ident),+) => {
54 impl $struct_name {
55 $(
56 paste! {
57 #[doc = "Add " $singular " to the builder."]
58 #[doc = "Subsequent calls overwrite the previous values if the key already exists."]
59 pub fn [<with_ $singular>]<K, V>(mut self, k: K, v: V) -> Self
60 where
61 K: AsRef<str>,
62 V: Into<String>,
63 {
64 let option_resolver = &mut self.option_resolver;
65 option_resolver.$field.insert(k.as_ref().to_string(), v.into());
66 self
67 }
68
69 #[doc = "Add " $field " to the builder."]
70 #[doc = "Subsequent calls overwrite the previous values if the key already exists."]
71 pub fn [<with_ $field>]<I, K, V>(mut self, options: I) -> Self
72 where
73 I: IntoIterator<Item = (K, V)>,
74 K: AsRef<str>,
75 V: Into<String>,
76 {
77 let option_resolver = &mut self.option_resolver;
78 option_resolver.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into())));
79 self
80 }
81 }
82 )+
83 }
84 };
85}
86
87impl_with_options!(
88 TableBuilder,
89 hudi_options,
90 hudi_option,
91 storage_options,
92 storage_option,
93 options,
94 option
95);
96
97impl TableBuilder {
98 pub fn from_base_uri(base_uri: &str) -> Self {
100 let option_resolver = OptionResolver::new(base_uri);
101 TableBuilder { option_resolver }
102 }
103
104 pub async fn build(&mut self) -> Result<Table> {
105 let option_resolver = &mut self.option_resolver;
106
107 option_resolver.resolve_options().await?;
108
109 let hudi_configs = Arc::from(HudiConfigs::new(option_resolver.hudi_options.iter()));
110
111 let storage_options = Arc::from(self.option_resolver.storage_options.clone());
112
113 let timeline =
114 Timeline::new_from_storage(hudi_configs.clone(), storage_options.clone()).await?;
115
116 let file_system_view =
117 FileSystemView::new(hudi_configs.clone(), storage_options.clone()).await?;
118
119 Ok(Table {
120 hudi_configs,
121 storage_options,
122 timeline,
123 file_system_view,
124 })
125 }
126}
127
128impl OptionResolver {
129 pub fn new(base_uri: &str) -> Self {
131 Self {
132 base_uri: base_uri.to_string(),
133 hudi_options: HashMap::new(),
134 storage_options: HashMap::new(),
135 options: HashMap::new(),
136 }
137 }
138
139 pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> Self
141 where
142 I: IntoIterator<Item = (K, V)>,
143 K: AsRef<str>,
144 V: Into<String>,
145 {
146 let options = options
147 .into_iter()
148 .map(|(k, v)| (k.as_ref().to_string(), v.into()))
149 .collect();
150 Self {
151 base_uri: base_uri.to_string(),
152 hudi_options: HashMap::new(),
153 storage_options: HashMap::new(),
154 options,
155 }
156 }
157
158 pub async fn resolve_options(&mut self) -> Result<()> {
169 self.resolve_user_provided_options();
170
171 self.resolve_cloud_env_vars();
174
175 self.resolve_hudi_options().await?;
178
179 let hudi_configs = HudiConfigs::new(self.hudi_options.iter());
181 validate_configs(&hudi_configs)
182 }
183
184 fn resolve_user_provided_options(&mut self) {
185 self.hudi_options.insert(
187 HudiTableConfig::BasePath.as_ref().to_string(),
188 self.base_uri.clone(),
189 );
190
191 let (generic_hudi_opts, generic_other_opts) =
192 split_hudi_options_from_others(self.options.iter());
193
194 extend_if_absent(&mut self.hudi_options, &generic_hudi_opts);
197 extend_if_absent(&mut self.storage_options, &generic_other_opts)
198 }
199
200 fn resolve_cloud_env_vars(&mut self) {
201 for (key, value) in env::vars() {
202 if Storage::CLOUD_STORAGE_PREFIXES
203 .iter()
204 .any(|prefix| key.starts_with(prefix))
205 && !self.storage_options.contains_key(&key.to_ascii_lowercase())
206 {
207 self.storage_options.insert(key.to_ascii_lowercase(), value);
208 }
209 }
210 }
211
212 async fn resolve_hudi_options(&mut self) -> Result<()> {
213 let storage = Storage::new(
215 Arc::new(self.storage_options.clone()),
216 Arc::new(HudiConfigs::new(self.hudi_options.iter())),
217 )?;
218
219 let hudi_options = &mut self.hudi_options;
220 Self::imbue_table_properties(hudi_options, storage.clone()).await?;
221
222 Self::imbue_global_hudi_configs_if_absent(hudi_options, storage.clone()).await
227 }
228
229 async fn imbue_table_properties(
230 options: &mut HashMap<String, String>,
231 storage: Arc<Storage>,
232 ) -> Result<()> {
233 let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
234 let table_properties = parse_data_for_options(&bytes, "=")?;
235
236 for (k, v) in table_properties {
240 options.insert(k.to_string(), v.to_string());
241 }
242
243 Ok(())
244 }
245
246 async fn imbue_global_hudi_configs_if_absent(
247 options: &mut HashMap<String, String>,
248 storage: Arc<Storage>,
249 ) -> Result<()> {
250 let global_config_path = env::var(HUDI_CONF_DIR)
251 .map(PathBuf::from)
252 .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
253 .join("hudi-defaults.conf");
254
255 if let Ok(bytes) = storage
256 .get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
257 .await
258 {
259 if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") {
260 for (key, value) in global_configs {
261 if key.starts_with("hoodie.") && !options.contains_key(&key) {
262 options.insert(key.to_string(), value.to_string());
263 }
264 }
265 }
266 }
267
268 Ok(())
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 fn create_table_builder() -> TableBuilder {
277 let option_resolver = OptionResolver::new("test_uri");
278 TableBuilder { option_resolver }
279 }
280
281 #[test]
282 fn test_with_hudi_option() {
283 let builder = create_table_builder();
284
285 let updated = builder.with_hudi_option("key", "value").option_resolver;
286 assert_eq!(updated.hudi_options["key"], "value")
287 }
288
289 #[test]
290 fn test_with_hudi_options() {
291 let builder = create_table_builder();
292
293 let options = [("key1", "value1"), ("key2", "value2")];
294 let updated = builder.with_hudi_options(options).option_resolver;
295 assert_eq!(updated.hudi_options["key1"], "value1");
296 assert_eq!(updated.hudi_options["key2"], "value2")
297 }
298
299 #[test]
300 fn test_with_storage_option() {
301 let builder = create_table_builder();
302
303 let updated = builder.with_storage_option("key", "value").option_resolver;
304 assert_eq!(updated.storage_options["key"], "value")
305 }
306
307 #[test]
308 fn test_with_storage_options() {
309 let builder = create_table_builder();
310
311 let options = [("key1", "value1"), ("key2", "value2")];
312 let updated = builder.with_storage_options(options).option_resolver;
313 assert_eq!(updated.storage_options["key1"], "value1");
314 assert_eq!(updated.storage_options["key2"], "value2");
315 }
316
317 #[test]
318 fn test_with_option() {
319 let builder = create_table_builder();
320
321 let updated = builder.with_option("key", "value").option_resolver;
322 assert_eq!(updated.options["key"], "value")
323 }
324
325 #[test]
326 fn test_with_options() {
327 let builder = create_table_builder();
328
329 let options = [("key1", "value1"), ("key2", "value2")];
330 let updated = builder.with_options(options).option_resolver;
331 assert_eq!(updated.options["key1"], "value1");
332 assert_eq!(updated.options["key2"], "value2")
333 }
334
335 #[test]
336 fn test_builder_resolve_user_provided_options_should_apply_precedence_order() {
337 let mut builder = TableBuilder::from_base_uri("/tmp/hudi_data")
338 .with_hudi_option("hoodie.option1", "value1")
339 .with_option("hoodie.option2", "'value2")
340 .with_hudi_options([
341 ("hoodie.option1", "value1-1"),
342 ("hoodie.option3", "value3"),
343 ("hoodie.option1", "value1-2"),
344 ])
345 .with_storage_option("AWS_REGION", "us-east-2")
346 .with_storage_options([
347 ("AWS_REGION", "us-east-1"),
348 ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"),
349 ])
350 .with_option("AWS_REGION", "us-west-1")
351 .with_options([
352 ("hoodie.option3", "value3-1"),
353 ("hoodie.option2", "value2-1"),
354 ]);
355
356 let resolver = &mut builder.option_resolver;
357 resolver.resolve_user_provided_options();
358
359 assert_eq!(resolver.hudi_options.len(), 4);
360 assert_eq!(resolver.hudi_options["hoodie.base.path"], "/tmp/hudi_data");
361 assert_eq!(resolver.hudi_options["hoodie.option1"], "value1-2");
362 assert_eq!(resolver.hudi_options["hoodie.option2"], "value2-1");
363 assert_eq!(resolver.hudi_options["hoodie.option3"], "value3");
364 assert_eq!(resolver.storage_options.len(), 2);
365 assert_eq!(resolver.storage_options["AWS_REGION"], "us-east-1");
366 assert_eq!(
367 resolver.storage_options["AWS_ENDPOINT"],
368 "s3.us-east-1.amazonaws.com"
369 );
370 }
371}