datafusion_table_providers/duckdb/
settings.rs1use crate::sql::db_connection_pool::dbconnection::duckdbconn::DuckDBSyncParameter;
101use crate::{duckdb::Error, sql::db_connection_pool::dbconnection::SyncDbConnection};
102use datafusion::error::{DataFusionError, Result as DataFusionResult};
103use duckdb::DuckdbConnectionManager;
104use r2d2::PooledConnection;
105use snafu::prelude::*;
106use std::collections::HashMap;
107use std::sync::Arc;
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum DuckDBSettingScope {
112 Global,
114 Local,
116}
117
118pub trait DuckDBSetting: Send + Sync + std::fmt::Debug {
120 fn as_any(&self) -> &dyn std::any::Any;
121
122 fn setting_name(&self) -> &'static str;
124
125 fn scope(&self) -> DuckDBSettingScope;
127
128 fn get_value(&self, options: &HashMap<String, String>) -> Option<String>;
130
131 fn validate(&self, _value: &str) -> Result<(), Error> {
133 Ok(())
134 }
135
136 fn format_sql_value(&self, value: &str) -> String {
138 value.to_string()
139 }
140}
141
142#[derive(Debug)]
144pub struct DuckDBSettingsRegistry {
145 settings: Vec<Box<dyn DuckDBSetting>>,
146}
147
148impl Default for DuckDBSettingsRegistry {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl DuckDBSettingsRegistry {
155 pub fn new() -> Self {
157 let mut registry = Self {
158 settings: Vec::new(),
159 };
160
161 registry.register(Box::new(MemoryLimitSetting));
163 registry.register(Box::new(TempDirectorySetting));
164 registry.register(Box::new(PreserveInsertionOrderSetting));
165
166 registry
167 }
168
169 pub fn empty() -> Self {
171 Self {
172 settings: Vec::new(),
173 }
174 }
175
176 pub fn with_setting(mut self, setting: Box<dyn DuckDBSetting>) -> Self {
177 self.register(setting);
178 self
179 }
180
181 pub fn register(&mut self, setting: Box<dyn DuckDBSetting>) {
183 self.settings.push(setting);
184 }
185
186 pub fn apply_settings(
188 &self,
189 conn: &dyn SyncDbConnection<
190 PooledConnection<DuckdbConnectionManager>,
191 Box<dyn DuckDBSyncParameter>,
192 >,
193 options: &HashMap<String, String>,
194 scope: DuckDBSettingScope,
195 ) -> DataFusionResult<()> {
196 for setting in &self.settings {
197 if setting.scope() != scope {
198 tracing::debug!(
199 "Skipping setting {} because it's not a {scope:?}",
200 setting.setting_name(),
201 );
202 continue;
203 }
204
205 if let Some(value) = setting.get_value(options) {
206 setting
207 .validate(&value)
208 .map_err(|e| DataFusionError::External(Box::new(e)))?;
209
210 let set_statement = self.set_statement(setting.as_ref(), &value);
211 tracing::debug!("DuckDB: {}", set_statement);
212 conn.execute(&set_statement, &[])?;
213 }
214 }
215
216 Ok(())
217 }
218
219 pub fn get_setting_statements(
221 &self,
222 options: &HashMap<String, String>,
223 scope: DuckDBSettingScope,
224 ) -> Vec<Arc<str>> {
225 self.settings
226 .iter()
227 .filter(|s| s.scope() == scope)
228 .filter_map(|s| {
229 s.get_value(options)
230 .map(|value| self.set_statement(s.as_ref(), &value))
231 })
232 .map(|s| s.into())
233 .collect()
234 }
235
236 fn set_statement(&self, setting: &dyn DuckDBSetting, value: &str) -> String {
237 format!(
238 "SET {} = {}",
239 setting.setting_name(),
240 setting.format_sql_value(value)
241 )
242 }
243}
244
245#[derive(Debug)]
247pub struct MemoryLimitSetting;
248
249impl DuckDBSetting for MemoryLimitSetting {
250 fn as_any(&self) -> &dyn std::any::Any {
251 self
252 }
253
254 fn setting_name(&self) -> &'static str {
255 "memory_limit"
256 }
257
258 fn scope(&self) -> DuckDBSettingScope {
259 DuckDBSettingScope::Global
260 }
261
262 fn get_value(&self, options: &HashMap<String, String>) -> Option<String> {
263 options.get("memory_limit").cloned()
264 }
265
266 fn validate(&self, value: &str) -> Result<(), Error> {
267 byte_unit::Byte::parse_str(value, true).context(
268 crate::duckdb::UnableToParseMemoryLimitSnafu {
269 value: value.to_string(),
270 },
271 )?;
272 Ok(())
273 }
274
275 fn format_sql_value(&self, value: &str) -> String {
276 format!("'{}'", value)
277 }
278}
279
280#[derive(Debug)]
282pub struct TempDirectorySetting;
283
284impl DuckDBSetting for TempDirectorySetting {
285 fn as_any(&self) -> &dyn std::any::Any {
286 self
287 }
288
289 fn setting_name(&self) -> &'static str {
290 "temp_directory"
291 }
292
293 fn scope(&self) -> DuckDBSettingScope {
294 DuckDBSettingScope::Global
295 }
296
297 fn get_value(&self, options: &HashMap<String, String>) -> Option<String> {
298 options.get("temp_directory").cloned()
299 }
300
301 fn format_sql_value(&self, value: &str) -> String {
302 format!("'{}'", value)
303 }
304}
305
306#[derive(Debug)]
308pub struct PreserveInsertionOrderSetting;
309
310impl DuckDBSetting for PreserveInsertionOrderSetting {
311 fn as_any(&self) -> &dyn std::any::Any {
312 self
313 }
314
315 fn setting_name(&self) -> &'static str {
316 "preserve_insertion_order"
317 }
318
319 fn scope(&self) -> DuckDBSettingScope {
320 DuckDBSettingScope::Global
321 }
322
323 fn get_value(&self, options: &HashMap<String, String>) -> Option<String> {
324 options.get("preserve_insertion_order").cloned()
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use std::collections::HashMap;
332
333 #[derive(Debug)]
335 struct TestUnconditionalSetting {
336 name: &'static str,
337 value: String,
338 }
339
340 impl TestUnconditionalSetting {
341 fn new(name: &'static str, value: String) -> Self {
342 Self { name, value }
343 }
344 }
345
346 impl DuckDBSetting for TestUnconditionalSetting {
347 fn as_any(&self) -> &dyn std::any::Any {
348 self
349 }
350
351 fn setting_name(&self) -> &'static str {
352 self.name
353 }
354
355 fn get_value(&self, _options: &HashMap<String, String>) -> Option<String> {
356 Some(self.value.clone())
358 }
359
360 fn scope(&self) -> DuckDBSettingScope {
361 DuckDBSettingScope::Global
362 }
363
364 fn format_sql_value(&self, value: &str) -> String {
365 format!("'{}'", value)
366 }
367 }
368
369 #[derive(Debug)]
371 struct TestValidatingSetting;
372
373 impl DuckDBSetting for TestValidatingSetting {
374 fn as_any(&self) -> &dyn std::any::Any {
375 self
376 }
377
378 fn setting_name(&self) -> &'static str {
379 "test_setting"
380 }
381
382 fn scope(&self) -> DuckDBSettingScope {
383 DuckDBSettingScope::Global
384 }
385
386 fn get_value(&self, options: &HashMap<String, String>) -> Option<String> {
387 options.get("test_setting").cloned()
388 }
389
390 fn validate(&self, value: &str) -> Result<(), Error> {
391 if value == "invalid" {
392 return Err(Error::DbConnectionError {
393 source: "Test validation error".into(),
394 });
395 }
396 Ok(())
397 }
398 }
399
400 #[test]
401 fn test_memory_limit_setting() {
402 let setting = MemoryLimitSetting;
403
404 assert_eq!(setting.setting_name(), "memory_limit");
406
407 let mut options = HashMap::new();
409 options.insert("memory_limit".to_string(), "1GB".to_string());
410 assert_eq!(setting.get_value(&options), Some("1GB".to_string()));
411
412 let empty_options = HashMap::new();
414 assert_eq!(setting.get_value(&empty_options), None);
415
416 assert_eq!(setting.format_sql_value("1GB"), "'1GB'");
418
419 assert!(setting.validate("1GB").is_ok());
421 assert!(setting.validate("512MiB").is_ok());
422
423 assert!(setting.validate("invalid").is_err());
425 }
426
427 #[test]
428 fn test_temp_directory_setting() {
429 let setting = TempDirectorySetting;
430
431 assert_eq!(setting.setting_name(), "temp_directory");
433
434 let mut options = HashMap::new();
436 options.insert("temp_directory".to_string(), "/tmp/test".to_string());
437 assert_eq!(setting.get_value(&options), Some("/tmp/test".to_string()));
438
439 let empty_options = HashMap::new();
441 assert_eq!(setting.get_value(&empty_options), None);
442
443 assert_eq!(setting.format_sql_value("/tmp/test"), "'/tmp/test'");
445
446 assert!(setting.validate("/tmp/test").is_ok());
448 assert!(setting.validate("any_value").is_ok());
449 }
450
451 #[test]
452 fn test_preserve_insertion_order_setting() {
453 let setting = PreserveInsertionOrderSetting;
454
455 assert_eq!(setting.setting_name(), "preserve_insertion_order");
457
458 let mut options = HashMap::new();
460 options.insert("preserve_insertion_order".to_string(), "true".to_string());
461 assert_eq!(setting.get_value(&options), Some("true".to_string()));
462
463 let empty_options = HashMap::new();
465 assert_eq!(setting.get_value(&empty_options), None);
466
467 assert_eq!(setting.format_sql_value("true"), "true");
469 assert_eq!(setting.format_sql_value("false"), "false");
470
471 assert!(setting.validate("true").is_ok());
473 assert!(setting.validate("false").is_ok());
474 }
475
476 #[test]
477 fn test_settings_registry_new() {
478 let registry = DuckDBSettingsRegistry::new();
479
480 assert_eq!(registry.settings.len(), 3);
482
483 let setting_names: Vec<&'static str> =
485 registry.settings.iter().map(|s| s.setting_name()).collect();
486
487 assert!(setting_names.contains(&"memory_limit"));
488 assert!(setting_names.contains(&"temp_directory"));
489 assert!(setting_names.contains(&"preserve_insertion_order"));
490 }
491
492 #[test]
493 fn test_settings_registry_empty() {
494 let registry = DuckDBSettingsRegistry::empty();
495
496 assert_eq!(registry.settings.len(), 0);
498 }
499
500 #[test]
501 fn test_settings_registry_default() {
502 let registry = DuckDBSettingsRegistry::default();
503
504 assert_eq!(registry.settings.len(), 3);
506 }
507
508 #[test]
509 fn test_settings_registry_register() {
510 let mut registry = DuckDBSettingsRegistry::empty();
511
512 assert_eq!(registry.settings.len(), 0);
514
515 registry.register(Box::new(TestUnconditionalSetting::new(
517 "test_setting",
518 "test_value".to_string(),
519 )));
520
521 assert_eq!(registry.settings.len(), 1);
523 assert_eq!(registry.settings[0].setting_name(), "test_setting");
524
525 registry.register(Box::new(MemoryLimitSetting));
527
528 assert_eq!(registry.settings.len(), 2);
530 }
531
532 #[test]
533 fn test_unconditional_setting() {
534 let setting =
535 TestUnconditionalSetting::new("test_unconditional", "always_this_value".to_string());
536
537 assert_eq!(setting.setting_name(), "test_unconditional");
539
540 let empty_options = HashMap::new();
542 assert_eq!(
543 setting.get_value(&empty_options),
544 Some("always_this_value".to_string())
545 );
546
547 let mut options_with_other_keys = HashMap::new();
548 options_with_other_keys.insert("some_other_key".to_string(), "some_value".to_string());
549 assert_eq!(
550 setting.get_value(&options_with_other_keys),
551 Some("always_this_value".to_string())
552 );
553
554 assert_eq!(setting.format_sql_value("test"), "'test'");
556 }
557
558 #[test]
559 fn test_custom_validation() {
560 let setting = TestValidatingSetting;
561
562 assert_eq!(setting.setting_name(), "test_setting");
564
565 let mut options = HashMap::new();
567 options.insert("test_setting".to_string(), "valid_value".to_string());
568 assert_eq!(setting.get_value(&options), Some("valid_value".to_string()));
569
570 assert!(setting.validate("valid_value").is_ok());
572
573 assert!(setting.validate("invalid").is_err());
575 }
576
577 #[test]
578 fn test_trait_default_implementations() {
579 let setting = TestUnconditionalSetting::new("test", "value".to_string());
580
581 assert!(setting.validate("any_value").is_ok());
583
584 let setting_with_defaults = TestValidatingSetting;
586 assert_eq!(setting_with_defaults.format_sql_value("test"), "test");
587 }
588
589 #[test]
590 fn test_as_any_functionality() {
591 let memory_setting = MemoryLimitSetting;
592 let boxed_setting: Box<dyn DuckDBSetting> = Box::new(memory_setting);
593
594 let any_ref = boxed_setting.as_any();
596 assert!(any_ref.is::<MemoryLimitSetting>());
597
598 let downcasted = any_ref.downcast_ref::<MemoryLimitSetting>();
600 assert!(downcasted.is_some());
601
602 assert!(any_ref.downcast_ref::<TempDirectorySetting>().is_none());
604 }
605
606 #[test]
607 fn test_memory_limit_validation_edge_cases() {
608 let setting = MemoryLimitSetting;
609
610 assert!(setting.validate("1KB").is_ok());
612 assert!(setting.validate("1MB").is_ok());
613 assert!(setting.validate("1GB").is_ok());
614 assert!(setting.validate("1TB").is_ok());
615 assert!(setting.validate("1KiB").is_ok());
616 assert!(setting.validate("1MiB").is_ok());
617 assert!(setting.validate("1GiB").is_ok());
618 assert!(setting.validate("1TiB").is_ok());
619 assert!(setting.validate("512.5MB").is_ok());
620 assert!(setting.validate("123").is_ok()); assert!(setting.validate("").is_err());
624 assert!(setting.validate("not_a_number").is_err());
625 assert!(setting.validate("123XB").is_err()); assert!(setting.validate("abc123MB").is_err()); }
628
629 #[test]
630 fn test_settings_ordering_in_registry() {
631 let mut registry = DuckDBSettingsRegistry::empty();
632
633 registry.register(Box::new(TestUnconditionalSetting::new(
635 "first",
636 "1".to_string(),
637 )));
638 registry.register(Box::new(TestUnconditionalSetting::new(
639 "second",
640 "2".to_string(),
641 )));
642 registry.register(Box::new(TestUnconditionalSetting::new(
643 "third",
644 "3".to_string(),
645 )));
646
647 assert_eq!(registry.settings[0].setting_name(), "first");
649 assert_eq!(registry.settings[1].setting_name(), "second");
650 assert_eq!(registry.settings[2].setting_name(), "third");
651 }
652
653 #[test]
654 fn test_multiple_settings_with_same_option_key() {
655 #[derive(Debug)]
659 struct TestSetting1;
660
661 #[derive(Debug)]
662 struct TestSetting2;
663
664 impl DuckDBSetting for TestSetting1 {
665 fn as_any(&self) -> &dyn std::any::Any {
666 self
667 }
668 fn setting_name(&self) -> &'static str {
669 "setting1"
670 }
671 fn scope(&self) -> DuckDBSettingScope {
672 DuckDBSettingScope::Global
673 }
674 fn get_value(&self, options: &HashMap<String, String>) -> Option<String> {
675 options.get("shared_key").cloned()
676 }
677 }
678
679 impl DuckDBSetting for TestSetting2 {
680 fn as_any(&self) -> &dyn std::any::Any {
681 self
682 }
683 fn setting_name(&self) -> &'static str {
684 "setting2"
685 }
686 fn scope(&self) -> DuckDBSettingScope {
687 DuckDBSettingScope::Global
688 }
689 fn get_value(&self, options: &HashMap<String, String>) -> Option<String> {
690 options.get("shared_key").cloned()
691 }
692 }
693
694 let setting1 = TestSetting1;
695 let setting2 = TestSetting2;
696
697 let mut options = HashMap::new();
698 options.insert("shared_key".to_string(), "shared_value".to_string());
699
700 assert_eq!(
702 setting1.get_value(&options),
703 Some("shared_value".to_string())
704 );
705 assert_eq!(
706 setting2.get_value(&options),
707 Some("shared_value".to_string())
708 );
709 }
710}