1#[expect(deprecated)]
22use crate::disk_manager::{DiskManagerConfig, SpillingProgress};
23use crate::{
24 disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25 memory_pool::{
26 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27 },
28 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{Result, config::ConfigEntry};
35use object_store::ObjectStore;
36use std::sync::Arc;
37use std::{
38 fmt::{Debug, Formatter},
39 num::NonZeroUsize,
40};
41use std::{path::PathBuf, time::Duration};
42use url::Url;
43
44#[derive(Clone)]
45pub struct RuntimeEnv {
75 pub memory_pool: Arc<dyn MemoryPool>,
77 pub disk_manager: Arc<DiskManager>,
79 pub cache_manager: Arc<CacheManager>,
81 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83 #[cfg(feature = "parquet_encryption")]
85 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90 write!(f, "RuntimeEnv")
91 }
92}
93
94fn create_runtime_config_entries(
100 memory_limit: Option<String>,
101 max_temp_directory_size: Option<String>,
102 temp_directory: Option<String>,
103 metadata_cache_limit: Option<String>,
104 list_files_cache_limit: Option<String>,
105 list_files_cache_ttl: Option<String>,
106 file_statistics_cache_limit: Option<String>,
107) -> Vec<ConfigEntry> {
108 vec![
109 ConfigEntry {
110 key: "datafusion.runtime.memory_limit".to_string(),
111 value: memory_limit,
112 description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
113 },
114 ConfigEntry {
115 key: "datafusion.runtime.max_temp_directory_size".to_string(),
116 value: max_temp_directory_size,
117 description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
118 },
119 ConfigEntry {
120 key: "datafusion.runtime.temp_directory".to_string(),
121 value: temp_directory,
122 description: "The path to the temporary file directory.",
123 },
124 ConfigEntry {
125 key: "datafusion.runtime.metadata_cache_limit".to_string(),
126 value: metadata_cache_limit,
127 description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
128 },
129 ConfigEntry {
130 key: "datafusion.runtime.list_files_cache_limit".to_string(),
131 value: list_files_cache_limit,
132 description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
133 },
134 ConfigEntry {
135 key: "datafusion.runtime.list_files_cache_ttl".to_string(),
136 value: list_files_cache_ttl,
137 description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.",
138 },
139 ConfigEntry {
140 key: "datafusion.runtime.file_statistics_cache_limit".to_string(),
141 value: file_statistics_cache_limit,
142 description: "Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
143 },
144 ]
145}
146
147impl RuntimeEnv {
148 pub fn register_object_store(
188 &self,
189 url: &Url,
190 object_store: Arc<dyn ObjectStore>,
191 ) -> Option<Arc<dyn ObjectStore>> {
192 self.object_store_registry.register_store(url, object_store)
193 }
194
195 pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
198 self.object_store_registry.deregister_store(url)
199 }
200
201 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
205 self.object_store_registry.get_store(url.as_ref())
206 }
207
208 pub fn spilling_progress(&self) -> SpillingProgress {
210 self.disk_manager.spilling_progress()
211 }
212
213 #[cfg(feature = "parquet_encryption")]
217 pub fn register_parquet_encryption_factory(
218 &self,
219 id: &str,
220 encryption_factory: Arc<dyn EncryptionFactory>,
221 ) -> Option<Arc<dyn EncryptionFactory>> {
222 self.parquet_encryption_factory_registry
223 .register_factory(id, encryption_factory)
224 }
225
226 #[cfg(feature = "parquet_encryption")]
228 pub fn parquet_encryption_factory(
229 &self,
230 id: &str,
231 ) -> Result<Arc<dyn EncryptionFactory>> {
232 self.parquet_encryption_factory_registry.get_factory(id)
233 }
234
235 pub fn config_entries(&self) -> Vec<ConfigEntry> {
237 use crate::memory_pool::MemoryLimit;
238
239 fn format_byte_size(size: u64) -> String {
241 const GB: u64 = 1024 * 1024 * 1024;
242 const MB: u64 = 1024 * 1024;
243 const KB: u64 = 1024;
244
245 match size {
246 s if s >= GB => format!("{}G", s / GB),
247 s if s >= MB => format!("{}M", s / MB),
248 s if s >= KB => format!("{}K", s / KB),
249 s => format!("{s}"),
250 }
251 }
252
253 fn format_duration(duration: Duration) -> String {
254 let total = duration.as_secs();
255 let mins = total / 60;
256 let secs = total % 60;
257
258 format!("{mins}m{secs}s")
259 }
260
261 let memory_limit_value = match self.memory_pool.memory_limit() {
262 MemoryLimit::Finite(size) => Some(format_byte_size(
263 size.try_into()
264 .expect("Memory limit size conversion failed"),
265 )),
266 MemoryLimit::Infinite => Some("unlimited".to_string()),
267 MemoryLimit::Unknown => None,
268 };
269
270 let max_temp_dir_size = self.disk_manager.max_temp_directory_size();
271 let max_temp_dir_value = format_byte_size(max_temp_dir_size);
272
273 let temp_paths = self.disk_manager.temp_dir_paths();
274 let temp_dir_value = if temp_paths.is_empty() {
275 None
276 } else {
277 Some(
278 temp_paths
279 .iter()
280 .map(|p| p.display().to_string())
281 .collect::<Vec<_>>()
282 .join(","),
283 )
284 };
285
286 let metadata_cache_limit = self.cache_manager.get_metadata_cache_limit();
287 let metadata_cache_value = format_byte_size(
288 metadata_cache_limit
289 .try_into()
290 .expect("Metadata cache size conversion failed"),
291 );
292
293 let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit();
294 let list_files_cache_value = format_byte_size(
295 list_files_cache_limit
296 .try_into()
297 .expect("List files cache size conversion failed"),
298 );
299
300 let list_files_cache_ttl = self
301 .cache_manager
302 .get_list_files_cache_ttl()
303 .map(format_duration);
304
305 let file_statistics_cache_limit =
306 self.cache_manager.get_file_statistic_cache_limit();
307 let file_statistics_cache_value = format_byte_size(
308 file_statistics_cache_limit
309 .try_into()
310 .expect("File statistics cache size conversion failed"),
311 );
312
313 create_runtime_config_entries(
314 memory_limit_value,
315 Some(max_temp_dir_value),
316 temp_dir_value,
317 Some(metadata_cache_value),
318 Some(list_files_cache_value),
319 list_files_cache_ttl,
320 Some(file_statistics_cache_value),
321 )
322 }
323}
324
325impl Default for RuntimeEnv {
326 fn default() -> Self {
327 RuntimeEnvBuilder::new().build().unwrap()
328 }
329}
330
331#[derive(Clone)]
335pub struct RuntimeEnvBuilder {
336 #[expect(deprecated)]
337 pub disk_manager: DiskManagerConfig,
339 pub disk_manager_builder: Option<DiskManagerBuilder>,
341 pub memory_pool: Option<Arc<dyn MemoryPool>>,
345 pub cache_manager: CacheManagerConfig,
347 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
349 #[cfg(feature = "parquet_encryption")]
351 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
352}
353
354impl Default for RuntimeEnvBuilder {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl RuntimeEnvBuilder {
361 pub fn new() -> Self {
363 Self {
364 disk_manager: Default::default(),
365 disk_manager_builder: Default::default(),
366 memory_pool: Default::default(),
367 cache_manager: Default::default(),
368 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
369 #[cfg(feature = "parquet_encryption")]
370 parquet_encryption_factory_registry: Default::default(),
371 }
372 }
373
374 #[expect(deprecated)]
375 #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
376 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
378 self.disk_manager = disk_manager;
379 self
380 }
381
382 pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
384 self.disk_manager_builder = Some(disk_manager);
385 self
386 }
387
388 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
390 self.memory_pool = Some(memory_pool);
391 self
392 }
393
394 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
396 self.cache_manager = cache_manager;
397 self
398 }
399
400 pub fn with_object_store_registry(
402 mut self,
403 object_store_registry: Arc<dyn ObjectStoreRegistry>,
404 ) -> Self {
405 self.object_store_registry = object_store_registry;
406 self
407 }
408
409 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
417 let pool_size = (max_memory as f64 * memory_fraction) as usize;
418 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
419 GreedyMemoryPool::new(pool_size),
420 NonZeroUsize::new(5).unwrap(),
421 )))
422 }
423
424 pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
426 let builder = self.disk_manager_builder.take().unwrap_or_default();
427 self.with_disk_manager_builder(
428 builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
429 )
430 }
431
432 pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
434 let builder = self.disk_manager_builder.take().unwrap_or_default();
435 self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
436 }
437
438 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
440 self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
441 self
442 }
443
444 pub fn with_object_list_cache_limit(mut self, limit: usize) -> Self {
446 self.cache_manager = self.cache_manager.with_list_files_cache_limit(limit);
447 self
448 }
449
450 pub fn with_object_list_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
452 self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl);
453 self
454 }
455
456 pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self {
457 self.cache_manager = self.cache_manager.with_file_statistics_cache_limit(limit);
458 self
459 }
460
461 pub fn build(self) -> Result<RuntimeEnv> {
463 let Self {
464 disk_manager,
465 disk_manager_builder,
466 memory_pool,
467 cache_manager,
468 object_store_registry,
469 #[cfg(feature = "parquet_encryption")]
470 parquet_encryption_factory_registry,
471 } = self;
472 let memory_pool =
473 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
474
475 Ok(RuntimeEnv {
476 memory_pool,
477 disk_manager: if let Some(builder) = disk_manager_builder {
478 Arc::new(builder.build()?)
479 } else {
480 #[expect(deprecated)]
481 DiskManager::try_new(disk_manager)?
482 },
483 cache_manager: CacheManager::try_new(&cache_manager)?,
484 object_store_registry,
485 #[cfg(feature = "parquet_encryption")]
486 parquet_encryption_factory_registry,
487 })
488 }
489
490 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
492 self.build().map(Arc::new)
493 }
494
495 pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
497 let cache_config = CacheManagerConfig {
498 file_statistics_cache: runtime_env.cache_manager.get_file_statistic_cache(),
499 file_statistics_cache_limit: runtime_env
500 .cache_manager
501 .get_file_statistic_cache_limit(),
502 list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
503 list_files_cache_limit: runtime_env
504 .cache_manager
505 .get_list_files_cache_limit(),
506 list_files_cache_ttl: runtime_env.cache_manager.get_list_files_cache_ttl(),
507 file_metadata_cache: Some(
508 runtime_env.cache_manager.get_file_metadata_cache(),
509 ),
510 metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
511 };
512
513 Self {
514 #[expect(deprecated)]
515 disk_manager: DiskManagerConfig::Existing(Arc::clone(
516 &runtime_env.disk_manager,
517 )),
518 disk_manager_builder: None,
519 memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
520 cache_manager: cache_config,
521 object_store_registry: Arc::clone(&runtime_env.object_store_registry),
522 #[cfg(feature = "parquet_encryption")]
523 parquet_encryption_factory_registry: Arc::clone(
524 &runtime_env.parquet_encryption_factory_registry,
525 ),
526 }
527 }
528
529 pub fn entries(&self) -> Vec<ConfigEntry> {
531 create_runtime_config_entries(
532 None,
533 Some("100G".to_string()),
534 None,
535 Some("50M".to_owned()),
536 Some("1M".to_owned()),
537 None,
538 Some("20M".to_owned()),
539 )
540 }
541
542 pub fn generate_config_markdown() -> String {
544 use std::fmt::Write as _;
545
546 let s = Self::default();
547
548 let mut docs = "| key | default | description |\n".to_string();
549 docs += "|-----|---------|-------------|\n";
550 let mut entries = s.entries();
551 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
552
553 for entry in &entries {
554 let _ = writeln!(
555 &mut docs,
556 "| {} | {} | {} |",
557 entry.key,
558 entry.value.as_deref().unwrap_or("NULL"),
559 entry.description
560 );
561 }
562 docs
563 }
564}