1#[expect(deprecated)]
22use crate::disk_manager::{DiskManagerConfig, SpillingProgress};
23use crate::{
24 disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25 memory_pool::{
26 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27 },
28 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{Result, config::ConfigEntry};
35use object_store::ObjectStore;
36use std::sync::Arc;
37use std::{
38 fmt::{Debug, Formatter},
39 num::NonZeroUsize,
40};
41use std::{path::PathBuf, time::Duration};
42use url::Url;
43
44#[derive(Clone)]
45pub struct RuntimeEnv {
75 pub memory_pool: Arc<dyn MemoryPool>,
77 pub disk_manager: Arc<DiskManager>,
79 pub cache_manager: Arc<CacheManager>,
81 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83 #[cfg(feature = "parquet_encryption")]
85 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90 write!(f, "RuntimeEnv")
91 }
92}
93
94fn create_runtime_config_entries(
100 memory_limit: Option<String>,
101 max_temp_directory_size: Option<String>,
102 temp_directory: Option<String>,
103 metadata_cache_limit: Option<String>,
104 list_files_cache_limit: Option<String>,
105 list_files_cache_ttl: Option<String>,
106) -> Vec<ConfigEntry> {
107 vec![
108 ConfigEntry {
109 key: "datafusion.runtime.memory_limit".to_string(),
110 value: memory_limit,
111 description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
112 },
113 ConfigEntry {
114 key: "datafusion.runtime.max_temp_directory_size".to_string(),
115 value: max_temp_directory_size,
116 description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
117 },
118 ConfigEntry {
119 key: "datafusion.runtime.temp_directory".to_string(),
120 value: temp_directory,
121 description: "The path to the temporary file directory.",
122 },
123 ConfigEntry {
124 key: "datafusion.runtime.metadata_cache_limit".to_string(),
125 value: metadata_cache_limit,
126 description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
127 },
128 ConfigEntry {
129 key: "datafusion.runtime.list_files_cache_limit".to_string(),
130 value: list_files_cache_limit,
131 description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
132 },
133 ConfigEntry {
134 key: "datafusion.runtime.list_files_cache_ttl".to_string(),
135 value: list_files_cache_ttl,
136 description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.",
137 },
138 ]
139}
140
141impl RuntimeEnv {
142 pub fn register_object_store(
182 &self,
183 url: &Url,
184 object_store: Arc<dyn ObjectStore>,
185 ) -> Option<Arc<dyn ObjectStore>> {
186 self.object_store_registry.register_store(url, object_store)
187 }
188
189 pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
192 self.object_store_registry.deregister_store(url)
193 }
194
195 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
199 self.object_store_registry.get_store(url.as_ref())
200 }
201
202 pub fn spilling_progress(&self) -> SpillingProgress {
204 self.disk_manager.spilling_progress()
205 }
206
207 #[cfg(feature = "parquet_encryption")]
211 pub fn register_parquet_encryption_factory(
212 &self,
213 id: &str,
214 encryption_factory: Arc<dyn EncryptionFactory>,
215 ) -> Option<Arc<dyn EncryptionFactory>> {
216 self.parquet_encryption_factory_registry
217 .register_factory(id, encryption_factory)
218 }
219
220 #[cfg(feature = "parquet_encryption")]
222 pub fn parquet_encryption_factory(
223 &self,
224 id: &str,
225 ) -> Result<Arc<dyn EncryptionFactory>> {
226 self.parquet_encryption_factory_registry.get_factory(id)
227 }
228
229 pub fn config_entries(&self) -> Vec<ConfigEntry> {
231 use crate::memory_pool::MemoryLimit;
232
233 fn format_byte_size(size: u64) -> String {
235 const GB: u64 = 1024 * 1024 * 1024;
236 const MB: u64 = 1024 * 1024;
237 const KB: u64 = 1024;
238
239 match size {
240 s if s >= GB => format!("{}G", s / GB),
241 s if s >= MB => format!("{}M", s / MB),
242 s if s >= KB => format!("{}K", s / KB),
243 s => format!("{s}"),
244 }
245 }
246
247 fn format_duration(duration: Duration) -> String {
248 let total = duration.as_secs();
249 let mins = total / 60;
250 let secs = total % 60;
251
252 format!("{mins}m{secs}s")
253 }
254
255 let memory_limit_value = match self.memory_pool.memory_limit() {
256 MemoryLimit::Finite(size) => Some(format_byte_size(
257 size.try_into()
258 .expect("Memory limit size conversion failed"),
259 )),
260 MemoryLimit::Infinite => Some("unlimited".to_string()),
261 MemoryLimit::Unknown => None,
262 };
263
264 let max_temp_dir_size = self.disk_manager.max_temp_directory_size();
265 let max_temp_dir_value = format_byte_size(max_temp_dir_size);
266
267 let temp_paths = self.disk_manager.temp_dir_paths();
268 let temp_dir_value = if temp_paths.is_empty() {
269 None
270 } else {
271 Some(
272 temp_paths
273 .iter()
274 .map(|p| p.display().to_string())
275 .collect::<Vec<_>>()
276 .join(","),
277 )
278 };
279
280 let metadata_cache_limit = self.cache_manager.get_metadata_cache_limit();
281 let metadata_cache_value = format_byte_size(
282 metadata_cache_limit
283 .try_into()
284 .expect("Metadata cache size conversion failed"),
285 );
286
287 let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit();
288 let list_files_cache_value = format_byte_size(
289 list_files_cache_limit
290 .try_into()
291 .expect("List files cache size conversion failed"),
292 );
293
294 let list_files_cache_ttl = self
295 .cache_manager
296 .get_list_files_cache_ttl()
297 .map(format_duration);
298
299 create_runtime_config_entries(
300 memory_limit_value,
301 Some(max_temp_dir_value),
302 temp_dir_value,
303 Some(metadata_cache_value),
304 Some(list_files_cache_value),
305 list_files_cache_ttl,
306 )
307 }
308}
309
310impl Default for RuntimeEnv {
311 fn default() -> Self {
312 RuntimeEnvBuilder::new().build().unwrap()
313 }
314}
315
316#[derive(Clone)]
320pub struct RuntimeEnvBuilder {
321 #[expect(deprecated)]
322 pub disk_manager: DiskManagerConfig,
324 pub disk_manager_builder: Option<DiskManagerBuilder>,
326 pub memory_pool: Option<Arc<dyn MemoryPool>>,
330 pub cache_manager: CacheManagerConfig,
332 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
334 #[cfg(feature = "parquet_encryption")]
336 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
337}
338
339impl Default for RuntimeEnvBuilder {
340 fn default() -> Self {
341 Self::new()
342 }
343}
344
345impl RuntimeEnvBuilder {
346 pub fn new() -> Self {
348 Self {
349 disk_manager: Default::default(),
350 disk_manager_builder: Default::default(),
351 memory_pool: Default::default(),
352 cache_manager: Default::default(),
353 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
354 #[cfg(feature = "parquet_encryption")]
355 parquet_encryption_factory_registry: Default::default(),
356 }
357 }
358
359 #[expect(deprecated)]
360 #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
361 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
363 self.disk_manager = disk_manager;
364 self
365 }
366
367 pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
369 self.disk_manager_builder = Some(disk_manager);
370 self
371 }
372
373 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
375 self.memory_pool = Some(memory_pool);
376 self
377 }
378
379 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
381 self.cache_manager = cache_manager;
382 self
383 }
384
385 pub fn with_object_store_registry(
387 mut self,
388 object_store_registry: Arc<dyn ObjectStoreRegistry>,
389 ) -> Self {
390 self.object_store_registry = object_store_registry;
391 self
392 }
393
394 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
402 let pool_size = (max_memory as f64 * memory_fraction) as usize;
403 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
404 GreedyMemoryPool::new(pool_size),
405 NonZeroUsize::new(5).unwrap(),
406 )))
407 }
408
409 pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
411 let builder = self.disk_manager_builder.take().unwrap_or_default();
412 self.with_disk_manager_builder(
413 builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
414 )
415 }
416
417 pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
419 let builder = self.disk_manager_builder.take().unwrap_or_default();
420 self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
421 }
422
423 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
425 self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
426 self
427 }
428
429 pub fn with_object_list_cache_limit(mut self, limit: usize) -> Self {
431 self.cache_manager = self.cache_manager.with_list_files_cache_limit(limit);
432 self
433 }
434
435 pub fn with_object_list_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
437 self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl);
438 self
439 }
440
441 pub fn build(self) -> Result<RuntimeEnv> {
443 let Self {
444 disk_manager,
445 disk_manager_builder,
446 memory_pool,
447 cache_manager,
448 object_store_registry,
449 #[cfg(feature = "parquet_encryption")]
450 parquet_encryption_factory_registry,
451 } = self;
452 let memory_pool =
453 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
454
455 Ok(RuntimeEnv {
456 memory_pool,
457 disk_manager: if let Some(builder) = disk_manager_builder {
458 Arc::new(builder.build()?)
459 } else {
460 #[expect(deprecated)]
461 DiskManager::try_new(disk_manager)?
462 },
463 cache_manager: CacheManager::try_new(&cache_manager)?,
464 object_store_registry,
465 #[cfg(feature = "parquet_encryption")]
466 parquet_encryption_factory_registry,
467 })
468 }
469
470 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
472 self.build().map(Arc::new)
473 }
474
475 pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
477 let cache_config = CacheManagerConfig {
478 table_files_statistics_cache: runtime_env
479 .cache_manager
480 .get_file_statistic_cache(),
481 list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
482 list_files_cache_limit: runtime_env
483 .cache_manager
484 .get_list_files_cache_limit(),
485 list_files_cache_ttl: runtime_env.cache_manager.get_list_files_cache_ttl(),
486 file_metadata_cache: Some(
487 runtime_env.cache_manager.get_file_metadata_cache(),
488 ),
489 metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
490 };
491
492 Self {
493 #[expect(deprecated)]
494 disk_manager: DiskManagerConfig::Existing(Arc::clone(
495 &runtime_env.disk_manager,
496 )),
497 disk_manager_builder: None,
498 memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
499 cache_manager: cache_config,
500 object_store_registry: Arc::clone(&runtime_env.object_store_registry),
501 #[cfg(feature = "parquet_encryption")]
502 parquet_encryption_factory_registry: Arc::clone(
503 &runtime_env.parquet_encryption_factory_registry,
504 ),
505 }
506 }
507
508 pub fn entries(&self) -> Vec<ConfigEntry> {
510 create_runtime_config_entries(
511 None,
512 Some("100G".to_string()),
513 None,
514 Some("50M".to_owned()),
515 Some("1M".to_owned()),
516 None,
517 )
518 }
519
520 pub fn generate_config_markdown() -> String {
522 use std::fmt::Write as _;
523
524 let s = Self::default();
525
526 let mut docs = "| key | default | description |\n".to_string();
527 docs += "|-----|---------|-------------|\n";
528 let mut entries = s.entries();
529 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
530
531 for entry in &entries {
532 let _ = writeln!(
533 &mut docs,
534 "| {} | {} | {} |",
535 entry.key,
536 entry.value.as_deref().unwrap_or("NULL"),
537 entry.description
538 );
539 }
540 docs
541 }
542}