1#[expect(deprecated)]
22use crate::disk_manager::DiskManagerConfig;
23use crate::{
24 disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25 memory_pool::{
26 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27 },
28 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{Result, config::ConfigEntry};
35use object_store::ObjectStore;
36use std::sync::Arc;
37use std::{
38 fmt::{Debug, Formatter},
39 num::NonZeroUsize,
40};
41use std::{path::PathBuf, time::Duration};
42use url::Url;
43
44#[derive(Clone)]
45pub struct RuntimeEnv {
75 pub memory_pool: Arc<dyn MemoryPool>,
77 pub disk_manager: Arc<DiskManager>,
79 pub cache_manager: Arc<CacheManager>,
81 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83 #[cfg(feature = "parquet_encryption")]
85 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90 write!(f, "RuntimeEnv")
91 }
92}
93
94fn create_runtime_config_entries(
100 memory_limit: Option<String>,
101 max_temp_directory_size: Option<String>,
102 temp_directory: Option<String>,
103 metadata_cache_limit: Option<String>,
104 list_files_cache_limit: Option<String>,
105 list_files_cache_ttl: Option<String>,
106) -> Vec<ConfigEntry> {
107 vec![
108 ConfigEntry {
109 key: "datafusion.runtime.memory_limit".to_string(),
110 value: memory_limit,
111 description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
112 },
113 ConfigEntry {
114 key: "datafusion.runtime.max_temp_directory_size".to_string(),
115 value: max_temp_directory_size,
116 description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
117 },
118 ConfigEntry {
119 key: "datafusion.runtime.temp_directory".to_string(),
120 value: temp_directory,
121 description: "The path to the temporary file directory.",
122 },
123 ConfigEntry {
124 key: "datafusion.runtime.metadata_cache_limit".to_string(),
125 value: metadata_cache_limit,
126 description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
127 },
128 ConfigEntry {
129 key: "datafusion.runtime.list_files_cache_limit".to_string(),
130 value: list_files_cache_limit,
131 description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
132 },
133 ConfigEntry {
134 key: "datafusion.runtime.list_files_cache_ttl".to_string(),
135 value: list_files_cache_ttl,
136 description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.",
137 },
138 ]
139}
140
141impl RuntimeEnv {
142 pub fn register_object_store(
182 &self,
183 url: &Url,
184 object_store: Arc<dyn ObjectStore>,
185 ) -> Option<Arc<dyn ObjectStore>> {
186 self.object_store_registry.register_store(url, object_store)
187 }
188
189 pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
192 self.object_store_registry.deregister_store(url)
193 }
194
195 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
199 self.object_store_registry.get_store(url.as_ref())
200 }
201
202 #[cfg(feature = "parquet_encryption")]
206 pub fn register_parquet_encryption_factory(
207 &self,
208 id: &str,
209 encryption_factory: Arc<dyn EncryptionFactory>,
210 ) -> Option<Arc<dyn EncryptionFactory>> {
211 self.parquet_encryption_factory_registry
212 .register_factory(id, encryption_factory)
213 }
214
215 #[cfg(feature = "parquet_encryption")]
217 pub fn parquet_encryption_factory(
218 &self,
219 id: &str,
220 ) -> Result<Arc<dyn EncryptionFactory>> {
221 self.parquet_encryption_factory_registry.get_factory(id)
222 }
223
224 pub fn config_entries(&self) -> Vec<ConfigEntry> {
226 use crate::memory_pool::MemoryLimit;
227
228 fn format_byte_size(size: u64) -> String {
230 const GB: u64 = 1024 * 1024 * 1024;
231 const MB: u64 = 1024 * 1024;
232 const KB: u64 = 1024;
233
234 match size {
235 s if s >= GB => format!("{}G", s / GB),
236 s if s >= MB => format!("{}M", s / MB),
237 s if s >= KB => format!("{}K", s / KB),
238 s => format!("{s}"),
239 }
240 }
241
242 fn format_duration(duration: Duration) -> String {
243 let total = duration.as_secs();
244 let mins = total / 60;
245 let secs = total % 60;
246
247 format!("{mins}m{secs}s")
248 }
249
250 let memory_limit_value = match self.memory_pool.memory_limit() {
251 MemoryLimit::Finite(size) => Some(format_byte_size(
252 size.try_into()
253 .expect("Memory limit size conversion failed"),
254 )),
255 MemoryLimit::Infinite => Some("unlimited".to_string()),
256 MemoryLimit::Unknown => None,
257 };
258
259 let max_temp_dir_size = self.disk_manager.max_temp_directory_size();
260 let max_temp_dir_value = format_byte_size(max_temp_dir_size);
261
262 let temp_paths = self.disk_manager.temp_dir_paths();
263 let temp_dir_value = if temp_paths.is_empty() {
264 None
265 } else {
266 Some(
267 temp_paths
268 .iter()
269 .map(|p| p.display().to_string())
270 .collect::<Vec<_>>()
271 .join(","),
272 )
273 };
274
275 let metadata_cache_limit = self.cache_manager.get_metadata_cache_limit();
276 let metadata_cache_value = format_byte_size(
277 metadata_cache_limit
278 .try_into()
279 .expect("Metadata cache size conversion failed"),
280 );
281
282 let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit();
283 let list_files_cache_value = format_byte_size(
284 list_files_cache_limit
285 .try_into()
286 .expect("List files cache size conversion failed"),
287 );
288
289 let list_files_cache_ttl = self
290 .cache_manager
291 .get_list_files_cache_ttl()
292 .map(format_duration);
293
294 create_runtime_config_entries(
295 memory_limit_value,
296 Some(max_temp_dir_value),
297 temp_dir_value,
298 Some(metadata_cache_value),
299 Some(list_files_cache_value),
300 list_files_cache_ttl,
301 )
302 }
303}
304
305impl Default for RuntimeEnv {
306 fn default() -> Self {
307 RuntimeEnvBuilder::new().build().unwrap()
308 }
309}
310
311#[derive(Clone)]
315pub struct RuntimeEnvBuilder {
316 #[expect(deprecated)]
317 pub disk_manager: DiskManagerConfig,
319 pub disk_manager_builder: Option<DiskManagerBuilder>,
321 pub memory_pool: Option<Arc<dyn MemoryPool>>,
325 pub cache_manager: CacheManagerConfig,
327 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
329 #[cfg(feature = "parquet_encryption")]
331 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
332}
333
334impl Default for RuntimeEnvBuilder {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340impl RuntimeEnvBuilder {
341 pub fn new() -> Self {
343 Self {
344 disk_manager: Default::default(),
345 disk_manager_builder: Default::default(),
346 memory_pool: Default::default(),
347 cache_manager: Default::default(),
348 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
349 #[cfg(feature = "parquet_encryption")]
350 parquet_encryption_factory_registry: Default::default(),
351 }
352 }
353
354 #[expect(deprecated)]
355 #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
356 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
358 self.disk_manager = disk_manager;
359 self
360 }
361
362 pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
364 self.disk_manager_builder = Some(disk_manager);
365 self
366 }
367
368 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
370 self.memory_pool = Some(memory_pool);
371 self
372 }
373
374 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
376 self.cache_manager = cache_manager;
377 self
378 }
379
380 pub fn with_object_store_registry(
382 mut self,
383 object_store_registry: Arc<dyn ObjectStoreRegistry>,
384 ) -> Self {
385 self.object_store_registry = object_store_registry;
386 self
387 }
388
389 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
397 let pool_size = (max_memory as f64 * memory_fraction) as usize;
398 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
399 GreedyMemoryPool::new(pool_size),
400 NonZeroUsize::new(5).unwrap(),
401 )))
402 }
403
404 pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
406 let builder = self.disk_manager_builder.take().unwrap_or_default();
407 self.with_disk_manager_builder(
408 builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
409 )
410 }
411
412 pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
414 let builder = self.disk_manager_builder.take().unwrap_or_default();
415 self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
416 }
417
418 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
420 self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
421 self
422 }
423
424 pub fn with_object_list_cache_limit(mut self, limit: usize) -> Self {
426 self.cache_manager = self.cache_manager.with_list_files_cache_limit(limit);
427 self
428 }
429
430 pub fn with_object_list_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
432 self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl);
433 self
434 }
435
436 pub fn build(self) -> Result<RuntimeEnv> {
438 let Self {
439 disk_manager,
440 disk_manager_builder,
441 memory_pool,
442 cache_manager,
443 object_store_registry,
444 #[cfg(feature = "parquet_encryption")]
445 parquet_encryption_factory_registry,
446 } = self;
447 let memory_pool =
448 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
449
450 Ok(RuntimeEnv {
451 memory_pool,
452 disk_manager: if let Some(builder) = disk_manager_builder {
453 Arc::new(builder.build()?)
454 } else {
455 #[expect(deprecated)]
456 DiskManager::try_new(disk_manager)?
457 },
458 cache_manager: CacheManager::try_new(&cache_manager)?,
459 object_store_registry,
460 #[cfg(feature = "parquet_encryption")]
461 parquet_encryption_factory_registry,
462 })
463 }
464
465 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
467 self.build().map(Arc::new)
468 }
469
470 pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
472 let cache_config = CacheManagerConfig {
473 table_files_statistics_cache: runtime_env
474 .cache_manager
475 .get_file_statistic_cache(),
476 list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
477 list_files_cache_limit: runtime_env
478 .cache_manager
479 .get_list_files_cache_limit(),
480 list_files_cache_ttl: runtime_env.cache_manager.get_list_files_cache_ttl(),
481 file_metadata_cache: Some(
482 runtime_env.cache_manager.get_file_metadata_cache(),
483 ),
484 metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
485 };
486
487 Self {
488 #[expect(deprecated)]
489 disk_manager: DiskManagerConfig::Existing(Arc::clone(
490 &runtime_env.disk_manager,
491 )),
492 disk_manager_builder: None,
493 memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
494 cache_manager: cache_config,
495 object_store_registry: Arc::clone(&runtime_env.object_store_registry),
496 #[cfg(feature = "parquet_encryption")]
497 parquet_encryption_factory_registry: Arc::clone(
498 &runtime_env.parquet_encryption_factory_registry,
499 ),
500 }
501 }
502
503 pub fn entries(&self) -> Vec<ConfigEntry> {
505 create_runtime_config_entries(
506 None,
507 Some("100G".to_string()),
508 None,
509 Some("50M".to_owned()),
510 Some("1M".to_owned()),
511 None,
512 )
513 }
514
515 pub fn generate_config_markdown() -> String {
517 use std::fmt::Write as _;
518
519 let s = Self::default();
520
521 let mut docs = "| key | default | description |\n".to_string();
522 docs += "|-----|---------|-------------|\n";
523 let mut entries = s.entries();
524 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
525
526 for entry in &entries {
527 let _ = writeln!(
528 &mut docs,
529 "| {} | {} | {} |",
530 entry.key,
531 entry.value.as_deref().unwrap_or("NULL"),
532 entry.description
533 );
534 }
535 docs
536 }
537}