hudi_core/table/
builder.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20use paste::paste;
21use std::collections::HashMap;
22use std::env;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use crate::config::table::HudiTableConfig;
27use crate::config::util::{parse_data_for_options, split_hudi_options_from_others};
28use crate::config::{HudiConfigs, HUDI_CONF_DIR};
29use crate::storage::Storage;
30use crate::table::fs_view::FileSystemView;
31use crate::table::validation::validate_configs;
32use crate::table::Table;
33use crate::timeline::Timeline;
34use crate::util::collection::extend_if_absent;
35use crate::Result;
36
37/// Builder for creating a [Table] instance.
38#[derive(Debug, Clone)]
39pub struct TableBuilder {
40    option_resolver: OptionResolver,
41}
42
43/// Resolver for options including Hudi options, storage options, and generic options.
44#[derive(Debug, Clone)]
45pub struct OptionResolver {
46    pub base_uri: String,
47    pub hudi_options: HashMap<String, String>,
48    pub storage_options: HashMap<String, String>,
49    pub options: HashMap<String, String>,
50}
51
52macro_rules! impl_with_options {
53    ($struct_name:ident, $($field:ident, $singular:ident),+) => {
54        impl $struct_name {
55            $(
56                paste! {
57                    #[doc = "Add " $singular " to the builder."]
58                    #[doc = "Subsequent calls overwrite the previous values if the key already exists."]
59                    pub fn [<with_ $singular>]<K, V>(mut self, k: K, v: V) -> Self
60                    where
61                        K: AsRef<str>,
62                        V: Into<String>,
63                    {
64                        let option_resolver = &mut self.option_resolver;
65                        option_resolver.$field.insert(k.as_ref().to_string(), v.into());
66                        self
67                    }
68
69                    #[doc = "Add " $field " to the builder."]
70                    #[doc = "Subsequent calls overwrite the previous values if the key already exists."]
71                    pub fn [<with_ $field>]<I, K, V>(mut self, options: I) -> Self
72                    where
73                        I: IntoIterator<Item = (K, V)>,
74                        K: AsRef<str>,
75                        V: Into<String>,
76                    {
77                        let option_resolver = &mut self.option_resolver;
78                        option_resolver.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into())));
79                        self
80                    }
81                }
82            )+
83        }
84    };
85}
86
87impl_with_options!(
88    TableBuilder,
89    hudi_options,
90    hudi_option,
91    storage_options,
92    storage_option,
93    options,
94    option
95);
96
97impl TableBuilder {
98    /// Create Hudi table builder from base table uri
99    pub fn from_base_uri(base_uri: &str) -> Self {
100        let option_resolver = OptionResolver::new(base_uri);
101        TableBuilder { option_resolver }
102    }
103
104    pub async fn build(&mut self) -> Result<Table> {
105        let option_resolver = &mut self.option_resolver;
106
107        option_resolver.resolve_options().await?;
108
109        let hudi_configs = Arc::from(HudiConfigs::new(option_resolver.hudi_options.iter()));
110
111        let storage_options = Arc::from(self.option_resolver.storage_options.clone());
112
113        let timeline =
114            Timeline::new_from_storage(hudi_configs.clone(), storage_options.clone()).await?;
115
116        let file_system_view =
117            FileSystemView::new(hudi_configs.clone(), storage_options.clone()).await?;
118
119        Ok(Table {
120            hudi_configs,
121            storage_options,
122            timeline,
123            file_system_view,
124        })
125    }
126}
127
128impl OptionResolver {
129    /// Create a new [OptionResolver] with the given base URI.
130    pub fn new(base_uri: &str) -> Self {
131        Self {
132            base_uri: base_uri.to_string(),
133            hudi_options: HashMap::new(),
134            storage_options: HashMap::new(),
135            options: HashMap::new(),
136        }
137    }
138
139    /// Create a new [OptionResolver] with the given base URI and options.
140    pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> Self
141    where
142        I: IntoIterator<Item = (K, V)>,
143        K: AsRef<str>,
144        V: Into<String>,
145    {
146        let options = options
147            .into_iter()
148            .map(|(k, v)| (k.as_ref().to_string(), v.into()))
149            .collect();
150        Self {
151            base_uri: base_uri.to_string(),
152            hudi_options: HashMap::new(),
153            storage_options: HashMap::new(),
154            options,
155        }
156    }
157
158    /// Resolve all options by combining the ones from hoodie.properties, user-provided options,
159    /// env vars, and global Hudi configs. The precedence order is as follows:
160    ///
161    /// 1. hoodie.properties
162    /// 2. Explicit options provided by the user
163    /// 3. Generic options provided by the user
164    /// 4. Env vars
165    /// 5. Global Hudi configs
166    ///
167    /// [note] Error may occur when 1 and 2 have conflicts.
168    pub async fn resolve_options(&mut self) -> Result<()> {
169        self.resolve_user_provided_options();
170
171        // If any user-provided options are intended for cloud storage and in uppercase,
172        // convert them to lowercase. This is to allow `object_store` to pick them up.
173        self.resolve_cloud_env_vars();
174
175        // At this point, we have resolved the storage options needed for accessing the storage layer.
176        // We can now resolve the hudi options
177        self.resolve_hudi_options().await?;
178
179        // Validate the resolved Hudi options
180        let hudi_configs = HudiConfigs::new(self.hudi_options.iter());
181        validate_configs(&hudi_configs)
182    }
183
184    fn resolve_user_provided_options(&mut self) {
185        // Insert the base path into hudi options since it is explicitly provided
186        self.hudi_options.insert(
187            HudiTableConfig::BasePath.as_ref().to_string(),
188            self.base_uri.clone(),
189        );
190
191        let (generic_hudi_opts, generic_other_opts) =
192            split_hudi_options_from_others(self.options.iter());
193
194        // Combine generic options (lower precedence) with explicit options.
195        // Note that we treat all non-Hudi options as storage options
196        extend_if_absent(&mut self.hudi_options, &generic_hudi_opts);
197        extend_if_absent(&mut self.storage_options, &generic_other_opts)
198    }
199
200    fn resolve_cloud_env_vars(&mut self) {
201        for (key, value) in env::vars() {
202            if Storage::CLOUD_STORAGE_PREFIXES
203                .iter()
204                .any(|prefix| key.starts_with(prefix))
205                && !self.storage_options.contains_key(&key.to_ascii_lowercase())
206            {
207                self.storage_options.insert(key.to_ascii_lowercase(), value);
208            }
209        }
210    }
211
212    async fn resolve_hudi_options(&mut self) -> Result<()> {
213        // create a [Storage] instance to load properties from storage layer.
214        let storage = Storage::new(
215            Arc::new(self.storage_options.clone()),
216            Arc::new(HudiConfigs::new(self.hudi_options.iter())),
217        )?;
218
219        let hudi_options = &mut self.hudi_options;
220        Self::imbue_table_properties(hudi_options, storage.clone()).await?;
221
222        // TODO support imbuing Hudi options from env vars HOODIE_ENV.*
223        // (see https://hudi.apache.org/docs/next/s3_hoodie)
224        // before loading global configs
225
226        Self::imbue_global_hudi_configs_if_absent(hudi_options, storage.clone()).await
227    }
228
229    async fn imbue_table_properties(
230        options: &mut HashMap<String, String>,
231        storage: Arc<Storage>,
232    ) -> Result<()> {
233        let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
234        let table_properties = parse_data_for_options(&bytes, "=")?;
235
236        // Table properties on storage (hoodie.properties) should have the highest precedence,
237        // except for writer-changeable properties like enabling metadata table/indexes.
238        // TODO: return err when user-provided options conflict with table properties
239        for (k, v) in table_properties {
240            options.insert(k.to_string(), v.to_string());
241        }
242
243        Ok(())
244    }
245
246    async fn imbue_global_hudi_configs_if_absent(
247        options: &mut HashMap<String, String>,
248        storage: Arc<Storage>,
249    ) -> Result<()> {
250        let global_config_path = env::var(HUDI_CONF_DIR)
251            .map(PathBuf::from)
252            .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
253            .join("hudi-defaults.conf");
254
255        if let Ok(bytes) = storage
256            .get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
257            .await
258        {
259            if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") {
260                for (key, value) in global_configs {
261                    if key.starts_with("hoodie.") && !options.contains_key(&key) {
262                        options.insert(key.to_string(), value.to_string());
263                    }
264                }
265            }
266        }
267
268        Ok(())
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    fn create_table_builder() -> TableBuilder {
277        let option_resolver = OptionResolver::new("test_uri");
278        TableBuilder { option_resolver }
279    }
280
281    #[test]
282    fn test_with_hudi_option() {
283        let builder = create_table_builder();
284
285        let updated = builder.with_hudi_option("key", "value").option_resolver;
286        assert_eq!(updated.hudi_options["key"], "value")
287    }
288
289    #[test]
290    fn test_with_hudi_options() {
291        let builder = create_table_builder();
292
293        let options = [("key1", "value1"), ("key2", "value2")];
294        let updated = builder.with_hudi_options(options).option_resolver;
295        assert_eq!(updated.hudi_options["key1"], "value1");
296        assert_eq!(updated.hudi_options["key2"], "value2")
297    }
298
299    #[test]
300    fn test_with_storage_option() {
301        let builder = create_table_builder();
302
303        let updated = builder.with_storage_option("key", "value").option_resolver;
304        assert_eq!(updated.storage_options["key"], "value")
305    }
306
307    #[test]
308    fn test_with_storage_options() {
309        let builder = create_table_builder();
310
311        let options = [("key1", "value1"), ("key2", "value2")];
312        let updated = builder.with_storage_options(options).option_resolver;
313        assert_eq!(updated.storage_options["key1"], "value1");
314        assert_eq!(updated.storage_options["key2"], "value2");
315    }
316
317    #[test]
318    fn test_with_option() {
319        let builder = create_table_builder();
320
321        let updated = builder.with_option("key", "value").option_resolver;
322        assert_eq!(updated.options["key"], "value")
323    }
324
325    #[test]
326    fn test_with_options() {
327        let builder = create_table_builder();
328
329        let options = [("key1", "value1"), ("key2", "value2")];
330        let updated = builder.with_options(options).option_resolver;
331        assert_eq!(updated.options["key1"], "value1");
332        assert_eq!(updated.options["key2"], "value2")
333    }
334
335    #[test]
336    fn test_builder_resolve_user_provided_options_should_apply_precedence_order() {
337        let mut builder = TableBuilder::from_base_uri("/tmp/hudi_data")
338            .with_hudi_option("hoodie.option1", "value1")
339            .with_option("hoodie.option2", "'value2")
340            .with_hudi_options([
341                ("hoodie.option1", "value1-1"),
342                ("hoodie.option3", "value3"),
343                ("hoodie.option1", "value1-2"),
344            ])
345            .with_storage_option("AWS_REGION", "us-east-2")
346            .with_storage_options([
347                ("AWS_REGION", "us-east-1"),
348                ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"),
349            ])
350            .with_option("AWS_REGION", "us-west-1")
351            .with_options([
352                ("hoodie.option3", "value3-1"),
353                ("hoodie.option2", "value2-1"),
354            ]);
355
356        let resolver = &mut builder.option_resolver;
357        resolver.resolve_user_provided_options();
358
359        assert_eq!(resolver.hudi_options.len(), 4);
360        assert_eq!(resolver.hudi_options["hoodie.base.path"], "/tmp/hudi_data");
361        assert_eq!(resolver.hudi_options["hoodie.option1"], "value1-2");
362        assert_eq!(resolver.hudi_options["hoodie.option2"], "value2-1");
363        assert_eq!(resolver.hudi_options["hoodie.option3"], "value3");
364        assert_eq!(resolver.storage_options.len(), 2);
365        assert_eq!(resolver.storage_options["AWS_REGION"], "us-east-1");
366        assert_eq!(
367            resolver.storage_options["AWS_ENDPOINT"],
368            "s3.us-east-1.amazonaws.com"
369        );
370    }
371}