Skip to main content

datafusion_execution/
runtime_env.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution [`RuntimeEnv`] environment that manages access to object
19//! store, memory manager, disk manager.
20
21#[expect(deprecated)]
22use crate::disk_manager::{DiskManagerConfig, SpillingProgress};
23use crate::{
24    disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25    memory_pool::{
26        GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27    },
28    object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{Result, config::ConfigEntry};
35use object_store::ObjectStore;
36use std::sync::Arc;
37use std::{
38    fmt::{Debug, Formatter},
39    num::NonZeroUsize,
40};
41use std::{path::PathBuf, time::Duration};
42use url::Url;
43
44#[derive(Clone)]
45/// Execution runtime environment that manages system resources such
46/// as memory, disk, cache and storage.
47///
48/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
49/// following resource management functionality:
50///
51/// * [`MemoryPool`]: Manage memory
52/// * [`DiskManager`]: Manage temporary files on local disk
53/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
54/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
55///
56/// # Example: Create default `RuntimeEnv`
57/// ```
58/// # use datafusion_execution::runtime_env::RuntimeEnv;
59/// let runtime_env = RuntimeEnv::default();
60/// ```
61///
62/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
63/// ```
64/// # use std::sync::Arc;
65/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
66/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
67/// // restrict to using at most 100MB of memory
68/// let pool_size = 100 * 1024 * 1024;
69/// let runtime_env = RuntimeEnvBuilder::new()
70///     .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
71///     .build()
72///     .unwrap();
73/// ```
74pub struct RuntimeEnv {
75    /// Runtime memory management
76    pub memory_pool: Arc<dyn MemoryPool>,
77    /// Manage temporary files during query execution
78    pub disk_manager: Arc<DiskManager>,
79    /// Manage temporary cache during query execution
80    pub cache_manager: Arc<CacheManager>,
81    /// Object Store Registry
82    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83    /// Parquet encryption factory registry
84    #[cfg(feature = "parquet_encryption")]
85    pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90        write!(f, "RuntimeEnv")
91    }
92}
93
94/// Creates runtime configuration entries with the provided values
95///
96/// This helper function defines the structure and metadata for all runtime configuration
97/// entries to avoid duplication between `RuntimeEnv::config_entries()` and
98/// `RuntimeEnvBuilder::entries()`.
99fn create_runtime_config_entries(
100    memory_limit: Option<String>,
101    max_temp_directory_size: Option<String>,
102    temp_directory: Option<String>,
103    metadata_cache_limit: Option<String>,
104    list_files_cache_limit: Option<String>,
105    list_files_cache_ttl: Option<String>,
106    file_statistics_cache_limit: Option<String>,
107) -> Vec<ConfigEntry> {
108    vec![
109        ConfigEntry {
110            key: "datafusion.runtime.memory_limit".to_string(),
111            value: memory_limit,
112            description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
113        },
114        ConfigEntry {
115            key: "datafusion.runtime.max_temp_directory_size".to_string(),
116            value: max_temp_directory_size,
117            description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
118        },
119        ConfigEntry {
120            key: "datafusion.runtime.temp_directory".to_string(),
121            value: temp_directory,
122            description: "The path to the temporary file directory.",
123        },
124        ConfigEntry {
125            key: "datafusion.runtime.metadata_cache_limit".to_string(),
126            value: metadata_cache_limit,
127            description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
128        },
129        ConfigEntry {
130            key: "datafusion.runtime.list_files_cache_limit".to_string(),
131            value: list_files_cache_limit,
132            description: "Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
133        },
134        ConfigEntry {
135            key: "datafusion.runtime.list_files_cache_ttl".to_string(),
136            value: list_files_cache_ttl,
137            description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.",
138        },
139        ConfigEntry {
140            key: "datafusion.runtime.file_statistics_cache_limit".to_string(),
141            value: file_statistics_cache_limit,
142            description: "Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
143        },
144    ]
145}
146
147impl RuntimeEnv {
148    /// Registers a custom `ObjectStore` to be used with a specific url.
149    /// This allows DataFusion to create external tables from urls that do not have
150    /// built in support such as `hdfs://namenode:port/...`.
151    ///
152    /// Returns the [`ObjectStore`] previously registered for this
153    /// scheme, if any.
154    ///
155    /// See [`ObjectStoreRegistry`] for more details
156    ///
157    /// # Example: Register local file system object store
158    /// ```
159    /// # use std::sync::Arc;
160    /// # use url::Url;
161    /// # use datafusion_execution::runtime_env::RuntimeEnv;
162    /// # let runtime_env = RuntimeEnv::default();
163    /// let url = Url::try_from("file://").unwrap();
164    /// let object_store = object_store::local::LocalFileSystem::new();
165    /// // register the object store with the runtime environment
166    /// runtime_env.register_object_store(&url, Arc::new(object_store));
167    /// ```
168    ///
169    /// # Example: Register remote URL object store like [Github](https://github.com)
170    /// ```
171    /// # use std::sync::Arc;
172    /// # use url::Url;
173    /// # use datafusion_execution::runtime_env::RuntimeEnv;
174    /// # let runtime_env = RuntimeEnv::default();
175    /// # // use local store for example as http feature is not enabled
176    /// # let http_store = object_store::local::LocalFileSystem::new();
177    /// // create a new object store via object_store::http::HttpBuilder;
178    /// let base_url = Url::parse("https://github.com").unwrap();
179    /// // (note this example can't depend on the http feature)
180    /// // let http_store = HttpBuilder::new()
181    /// //    .with_url(base_url.clone())
182    /// //    .build()
183    /// //    .unwrap();
184    /// // register the object store with the runtime environment
185    /// runtime_env.register_object_store(&base_url, Arc::new(http_store));
186    /// ```
187    pub fn register_object_store(
188        &self,
189        url: &Url,
190        object_store: Arc<dyn ObjectStore>,
191    ) -> Option<Arc<dyn ObjectStore>> {
192        self.object_store_registry.register_store(url, object_store)
193    }
194
195    /// Deregisters a custom `ObjectStore` previously registered for a specific url.
196    /// See [`ObjectStoreRegistry::deregister_store`] for more details.
197    pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
198        self.object_store_registry.deregister_store(url)
199    }
200
201    /// Retrieves a `ObjectStore` instance for a url by consulting the
202    /// registry. See [`ObjectStoreRegistry::get_store`] for more
203    /// details.
204    pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
205        self.object_store_registry.get_store(url.as_ref())
206    }
207
208    /// Returns the current spilling progress
209    pub fn spilling_progress(&self) -> SpillingProgress {
210        self.disk_manager.spilling_progress()
211    }
212
213    /// Register an [`EncryptionFactory`] with an associated identifier that can be later
214    /// used to configure encryption when reading or writing Parquet.
215    /// If an encryption factory with the same identifier was already registered, it is replaced and returned.
216    #[cfg(feature = "parquet_encryption")]
217    pub fn register_parquet_encryption_factory(
218        &self,
219        id: &str,
220        encryption_factory: Arc<dyn EncryptionFactory>,
221    ) -> Option<Arc<dyn EncryptionFactory>> {
222        self.parquet_encryption_factory_registry
223            .register_factory(id, encryption_factory)
224    }
225
226    /// Retrieve an [`EncryptionFactory`] by its identifier
227    #[cfg(feature = "parquet_encryption")]
228    pub fn parquet_encryption_factory(
229        &self,
230        id: &str,
231    ) -> Result<Arc<dyn EncryptionFactory>> {
232        self.parquet_encryption_factory_registry.get_factory(id)
233    }
234
235    /// Returns the current runtime configuration entries
236    pub fn config_entries(&self) -> Vec<ConfigEntry> {
237        use crate::memory_pool::MemoryLimit;
238
239        /// Convert bytes to a human-readable format
240        fn format_byte_size(size: u64) -> String {
241            const GB: u64 = 1024 * 1024 * 1024;
242            const MB: u64 = 1024 * 1024;
243            const KB: u64 = 1024;
244
245            match size {
246                s if s >= GB => format!("{}G", s / GB),
247                s if s >= MB => format!("{}M", s / MB),
248                s if s >= KB => format!("{}K", s / KB),
249                s => format!("{s}"),
250            }
251        }
252
253        fn format_duration(duration: Duration) -> String {
254            let total = duration.as_secs();
255            let mins = total / 60;
256            let secs = total % 60;
257
258            format!("{mins}m{secs}s")
259        }
260
261        let memory_limit_value = match self.memory_pool.memory_limit() {
262            MemoryLimit::Finite(size) => Some(format_byte_size(
263                size.try_into()
264                    .expect("Memory limit size conversion failed"),
265            )),
266            MemoryLimit::Infinite => Some("unlimited".to_string()),
267            MemoryLimit::Unknown => None,
268        };
269
270        let max_temp_dir_size = self.disk_manager.max_temp_directory_size();
271        let max_temp_dir_value = format_byte_size(max_temp_dir_size);
272
273        let temp_paths = self.disk_manager.temp_dir_paths();
274        let temp_dir_value = if temp_paths.is_empty() {
275            None
276        } else {
277            Some(
278                temp_paths
279                    .iter()
280                    .map(|p| p.display().to_string())
281                    .collect::<Vec<_>>()
282                    .join(","),
283            )
284        };
285
286        let metadata_cache_limit = self.cache_manager.get_metadata_cache_limit();
287        let metadata_cache_value = format_byte_size(
288            metadata_cache_limit
289                .try_into()
290                .expect("Metadata cache size conversion failed"),
291        );
292
293        let list_files_cache_limit = self.cache_manager.get_list_files_cache_limit();
294        let list_files_cache_value = format_byte_size(
295            list_files_cache_limit
296                .try_into()
297                .expect("List files cache size conversion failed"),
298        );
299
300        let list_files_cache_ttl = self
301            .cache_manager
302            .get_list_files_cache_ttl()
303            .map(format_duration);
304
305        let file_statistics_cache_limit =
306            self.cache_manager.get_file_statistic_cache_limit();
307        let file_statistics_cache_value = format_byte_size(
308            file_statistics_cache_limit
309                .try_into()
310                .expect("File statistics cache size conversion failed"),
311        );
312
313        create_runtime_config_entries(
314            memory_limit_value,
315            Some(max_temp_dir_value),
316            temp_dir_value,
317            Some(metadata_cache_value),
318            Some(list_files_cache_value),
319            list_files_cache_ttl,
320            Some(file_statistics_cache_value),
321        )
322    }
323}
324
325impl Default for RuntimeEnv {
326    fn default() -> Self {
327        RuntimeEnvBuilder::new().build().unwrap()
328    }
329}
330
331/// Execution runtime configuration builder.
332///
333/// See example on [`RuntimeEnv`]
334#[derive(Clone)]
335pub struct RuntimeEnvBuilder {
336    #[expect(deprecated)]
337    /// DiskManager to manage temporary disk file usage
338    pub disk_manager: DiskManagerConfig,
339    /// DiskManager builder to manager temporary disk file usage
340    pub disk_manager_builder: Option<DiskManagerBuilder>,
341    /// [`MemoryPool`] from which to allocate memory
342    ///
343    /// Defaults to using an [`UnboundedMemoryPool`] if `None`
344    pub memory_pool: Option<Arc<dyn MemoryPool>>,
345    /// CacheManager to manage cache data
346    pub cache_manager: CacheManagerConfig,
347    /// ObjectStoreRegistry to get object store based on url
348    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
349    /// Parquet encryption factory registry
350    #[cfg(feature = "parquet_encryption")]
351    pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
352}
353
354impl Default for RuntimeEnvBuilder {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl RuntimeEnvBuilder {
361    /// New with default values
362    pub fn new() -> Self {
363        Self {
364            disk_manager: Default::default(),
365            disk_manager_builder: Default::default(),
366            memory_pool: Default::default(),
367            cache_manager: Default::default(),
368            object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
369            #[cfg(feature = "parquet_encryption")]
370            parquet_encryption_factory_registry: Default::default(),
371        }
372    }
373
374    #[expect(deprecated)]
375    #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
376    /// Customize disk manager
377    pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
378        self.disk_manager = disk_manager;
379        self
380    }
381
382    /// Customize the disk manager builder
383    pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
384        self.disk_manager_builder = Some(disk_manager);
385        self
386    }
387
388    /// Customize memory policy
389    pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
390        self.memory_pool = Some(memory_pool);
391        self
392    }
393
394    /// Customize cache policy
395    pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
396        self.cache_manager = cache_manager;
397        self
398    }
399
400    /// Customize object store registry
401    pub fn with_object_store_registry(
402        mut self,
403        object_store_registry: Arc<dyn ObjectStoreRegistry>,
404    ) -> Self {
405        self.object_store_registry = object_store_registry;
406        self
407    }
408
409    /// Specify the total memory to use while running the DataFusion
410    /// plan to `max_memory * memory_fraction` in bytes.
411    ///
412    /// This defaults to using [`GreedyMemoryPool`] wrapped in the
413    /// [`TrackConsumersPool`] with a maximum of 5 consumers.
414    ///
415    /// Note DataFusion does not yet respect this limit in all cases.
416    pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
417        let pool_size = (max_memory as f64 * memory_fraction) as usize;
418        self.with_memory_pool(Arc::new(TrackConsumersPool::new(
419            GreedyMemoryPool::new(pool_size),
420            NonZeroUsize::new(5).unwrap(),
421        )))
422    }
423
424    /// Use the specified path to create any needed temporary files
425    pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
426        let builder = self.disk_manager_builder.take().unwrap_or_default();
427        self.with_disk_manager_builder(
428            builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
429        )
430    }
431
432    /// Specify a limit on the size of the temporary file directory in bytes
433    pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
434        let builder = self.disk_manager_builder.take().unwrap_or_default();
435        self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
436    }
437
438    /// Specify the limit of the file-embedded metadata cache, in bytes.
439    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
440        self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
441        self
442    }
443
444    /// Specifies the memory limit for the object list cache, in bytes.
445    pub fn with_object_list_cache_limit(mut self, limit: usize) -> Self {
446        self.cache_manager = self.cache_manager.with_list_files_cache_limit(limit);
447        self
448    }
449
450    /// Specifies the duration entries in the object list cache will be considered valid.
451    pub fn with_object_list_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
452        self.cache_manager = self.cache_manager.with_list_files_cache_ttl(ttl);
453        self
454    }
455
456    pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self {
457        self.cache_manager = self.cache_manager.with_file_statistics_cache_limit(limit);
458        self
459    }
460
461    /// Build a RuntimeEnv
462    pub fn build(self) -> Result<RuntimeEnv> {
463        let Self {
464            disk_manager,
465            disk_manager_builder,
466            memory_pool,
467            cache_manager,
468            object_store_registry,
469            #[cfg(feature = "parquet_encryption")]
470            parquet_encryption_factory_registry,
471        } = self;
472        let memory_pool =
473            memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
474
475        Ok(RuntimeEnv {
476            memory_pool,
477            disk_manager: if let Some(builder) = disk_manager_builder {
478                Arc::new(builder.build()?)
479            } else {
480                #[expect(deprecated)]
481                DiskManager::try_new(disk_manager)?
482            },
483            cache_manager: CacheManager::try_new(&cache_manager)?,
484            object_store_registry,
485            #[cfg(feature = "parquet_encryption")]
486            parquet_encryption_factory_registry,
487        })
488    }
489
490    /// Convenience method to create a new `Arc<RuntimeEnv>`
491    pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
492        self.build().map(Arc::new)
493    }
494
495    /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv
496    pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
497        let cache_config = CacheManagerConfig {
498            file_statistics_cache: runtime_env.cache_manager.get_file_statistic_cache(),
499            file_statistics_cache_limit: runtime_env
500                .cache_manager
501                .get_file_statistic_cache_limit(),
502            list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
503            list_files_cache_limit: runtime_env
504                .cache_manager
505                .get_list_files_cache_limit(),
506            list_files_cache_ttl: runtime_env.cache_manager.get_list_files_cache_ttl(),
507            file_metadata_cache: Some(
508                runtime_env.cache_manager.get_file_metadata_cache(),
509            ),
510            metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
511        };
512
513        Self {
514            #[expect(deprecated)]
515            disk_manager: DiskManagerConfig::Existing(Arc::clone(
516                &runtime_env.disk_manager,
517            )),
518            disk_manager_builder: None,
519            memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
520            cache_manager: cache_config,
521            object_store_registry: Arc::clone(&runtime_env.object_store_registry),
522            #[cfg(feature = "parquet_encryption")]
523            parquet_encryption_factory_registry: Arc::clone(
524                &runtime_env.parquet_encryption_factory_registry,
525            ),
526        }
527    }
528
529    /// Returns a list of all available runtime configurations with their current values and descriptions
530    pub fn entries(&self) -> Vec<ConfigEntry> {
531        create_runtime_config_entries(
532            None,
533            Some("100G".to_string()),
534            None,
535            Some("50M".to_owned()),
536            Some("1M".to_owned()),
537            None,
538            Some("20M".to_owned()),
539        )
540    }
541
542    /// Generate documentation that can be included in the user guide
543    pub fn generate_config_markdown() -> String {
544        use std::fmt::Write as _;
545
546        let s = Self::default();
547
548        let mut docs = "| key | default | description |\n".to_string();
549        docs += "|-----|---------|-------------|\n";
550        let mut entries = s.entries();
551        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
552
553        for entry in &entries {
554            let _ = writeln!(
555                &mut docs,
556                "| {} | {} | {} |",
557                entry.key,
558                entry.value.as_deref().unwrap_or("NULL"),
559                entry.description
560            );
561        }
562        docs
563    }
564}