datafusion_execution/
runtime_env.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution [`RuntimeEnv`] environment that manages access to object
19//! store, memory manager, disk manager.
20
21#[allow(deprecated)]
22use crate::disk_manager::DiskManagerConfig;
23use crate::{
24    disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25    memory_pool::{
26        GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27    },
28    object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{config::ConfigEntry, Result};
35use object_store::ObjectStore;
36use std::path::PathBuf;
37use std::sync::Arc;
38use std::{
39    fmt::{Debug, Formatter},
40    num::NonZeroUsize,
41};
42use url::Url;
43
44#[derive(Clone)]
45/// Execution runtime environment that manages system resources such
46/// as memory, disk, cache and storage.
47///
48/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
49/// following resource management functionality:
50///
51/// * [`MemoryPool`]: Manage memory
52/// * [`DiskManager`]: Manage temporary files on local disk
53/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
54/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
55///
56/// # Example: Create default `RuntimeEnv`
57/// ```
58/// # use datafusion_execution::runtime_env::RuntimeEnv;
59/// let runtime_env = RuntimeEnv::default();
60/// ```
61///
62/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
63/// ```
64/// # use std::sync::Arc;
65/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
66/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
67/// // restrict to using at most 100MB of memory
68/// let pool_size = 100 * 1024 * 1024;
69/// let runtime_env = RuntimeEnvBuilder::new()
70///     .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
71///     .build()
72///     .unwrap();
73/// ```
74pub struct RuntimeEnv {
75    /// Runtime memory management
76    pub memory_pool: Arc<dyn MemoryPool>,
77    /// Manage temporary files during query execution
78    pub disk_manager: Arc<DiskManager>,
79    /// Manage temporary cache during query execution
80    pub cache_manager: Arc<CacheManager>,
81    /// Object Store Registry
82    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83    /// Parquet encryption factory registry
84    #[cfg(feature = "parquet_encryption")]
85    pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90        write!(f, "RuntimeEnv")
91    }
92}
93
94impl RuntimeEnv {
95    /// Registers a custom `ObjectStore` to be used with a specific url.
96    /// This allows DataFusion to create external tables from urls that do not have
97    /// built in support such as `hdfs://namenode:port/...`.
98    ///
99    /// Returns the [`ObjectStore`] previously registered for this
100    /// scheme, if any.
101    ///
102    /// See [`ObjectStoreRegistry`] for more details
103    ///
104    /// # Example: Register local file system object store
105    /// ```
106    /// # use std::sync::Arc;
107    /// # use url::Url;
108    /// # use datafusion_execution::runtime_env::RuntimeEnv;
109    /// # let runtime_env = RuntimeEnv::default();
110    /// let url = Url::try_from("file://").unwrap();
111    /// let object_store = object_store::local::LocalFileSystem::new();
112    /// // register the object store with the runtime environment
113    /// runtime_env.register_object_store(&url, Arc::new(object_store));
114    /// ```
115    ///
116    /// # Example: Register remote URL object store like [Github](https://github.com)
117    /// ```
118    /// # use std::sync::Arc;
119    /// # use url::Url;
120    /// # use datafusion_execution::runtime_env::RuntimeEnv;
121    /// # let runtime_env = RuntimeEnv::default();
122    /// # // use local store for example as http feature is not enabled
123    /// # let http_store = object_store::local::LocalFileSystem::new();
124    /// // create a new object store via object_store::http::HttpBuilder;
125    /// let base_url = Url::parse("https://github.com").unwrap();
126    /// // (note this example can't depend on the http feature)
127    /// // let http_store = HttpBuilder::new()
128    /// //    .with_url(base_url.clone())
129    /// //    .build()
130    /// //    .unwrap();
131    /// // register the object store with the runtime environment
132    /// runtime_env.register_object_store(&base_url, Arc::new(http_store));
133    /// ```
134    pub fn register_object_store(
135        &self,
136        url: &Url,
137        object_store: Arc<dyn ObjectStore>,
138    ) -> Option<Arc<dyn ObjectStore>> {
139        self.object_store_registry.register_store(url, object_store)
140    }
141
142    /// Deregisters a custom `ObjectStore` previously registered for a specific url.
143    /// See [`ObjectStoreRegistry::deregister_store`] for more details.
144    pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
145        self.object_store_registry.deregister_store(url)
146    }
147
148    /// Retrieves a `ObjectStore` instance for a url by consulting the
149    /// registry. See [`ObjectStoreRegistry::get_store`] for more
150    /// details.
151    pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
152        self.object_store_registry.get_store(url.as_ref())
153    }
154
155    /// Register an [`EncryptionFactory`] with an associated identifier that can be later
156    /// used to configure encryption when reading or writing Parquet.
157    /// If an encryption factory with the same identifier was already registered, it is replaced and returned.
158    #[cfg(feature = "parquet_encryption")]
159    pub fn register_parquet_encryption_factory(
160        &self,
161        id: &str,
162        encryption_factory: Arc<dyn EncryptionFactory>,
163    ) -> Option<Arc<dyn EncryptionFactory>> {
164        self.parquet_encryption_factory_registry
165            .register_factory(id, encryption_factory)
166    }
167
168    /// Retrieve an [`EncryptionFactory`] by its identifier
169    #[cfg(feature = "parquet_encryption")]
170    pub fn parquet_encryption_factory(
171        &self,
172        id: &str,
173    ) -> Result<Arc<dyn EncryptionFactory>> {
174        self.parquet_encryption_factory_registry.get_factory(id)
175    }
176}
177
178impl Default for RuntimeEnv {
179    fn default() -> Self {
180        RuntimeEnvBuilder::new().build().unwrap()
181    }
182}
183
184/// Execution runtime configuration builder.
185///
186/// See example on [`RuntimeEnv`]
187#[derive(Clone)]
188pub struct RuntimeEnvBuilder {
189    #[allow(deprecated)]
190    /// DiskManager to manage temporary disk file usage
191    pub disk_manager: DiskManagerConfig,
192    /// DiskManager builder to manager temporary disk file usage
193    pub disk_manager_builder: Option<DiskManagerBuilder>,
194    /// [`MemoryPool`] from which to allocate memory
195    ///
196    /// Defaults to using an [`UnboundedMemoryPool`] if `None`
197    pub memory_pool: Option<Arc<dyn MemoryPool>>,
198    /// CacheManager to manage cache data
199    pub cache_manager: CacheManagerConfig,
200    /// ObjectStoreRegistry to get object store based on url
201    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
202    /// Parquet encryption factory registry
203    #[cfg(feature = "parquet_encryption")]
204    pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
205}
206
207impl Default for RuntimeEnvBuilder {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213impl RuntimeEnvBuilder {
214    /// New with default values
215    pub fn new() -> Self {
216        Self {
217            disk_manager: Default::default(),
218            disk_manager_builder: Default::default(),
219            memory_pool: Default::default(),
220            cache_manager: Default::default(),
221            object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
222            #[cfg(feature = "parquet_encryption")]
223            parquet_encryption_factory_registry: Default::default(),
224        }
225    }
226
227    #[allow(deprecated)]
228    #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
229    /// Customize disk manager
230    pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
231        self.disk_manager = disk_manager;
232        self
233    }
234
235    /// Customize the disk manager builder
236    pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
237        self.disk_manager_builder = Some(disk_manager);
238        self
239    }
240
241    /// Customize memory policy
242    pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
243        self.memory_pool = Some(memory_pool);
244        self
245    }
246
247    /// Customize cache policy
248    pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
249        self.cache_manager = cache_manager;
250        self
251    }
252
253    /// Customize object store registry
254    pub fn with_object_store_registry(
255        mut self,
256        object_store_registry: Arc<dyn ObjectStoreRegistry>,
257    ) -> Self {
258        self.object_store_registry = object_store_registry;
259        self
260    }
261
262    /// Specify the total memory to use while running the DataFusion
263    /// plan to `max_memory * memory_fraction` in bytes.
264    ///
265    /// This defaults to using [`GreedyMemoryPool`] wrapped in the
266    /// [`TrackConsumersPool`] with a maximum of 5 consumers.
267    ///
268    /// Note DataFusion does not yet respect this limit in all cases.
269    pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
270        let pool_size = (max_memory as f64 * memory_fraction) as usize;
271        self.with_memory_pool(Arc::new(TrackConsumersPool::new(
272            GreedyMemoryPool::new(pool_size),
273            NonZeroUsize::new(5).unwrap(),
274        )))
275    }
276
277    /// Use the specified path to create any needed temporary files
278    pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
279        let builder = self.disk_manager_builder.take().unwrap_or_default();
280        self.with_disk_manager_builder(
281            builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
282        )
283    }
284
285    /// Specify a limit on the size of the temporary file directory in bytes
286    pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
287        let builder = self.disk_manager_builder.take().unwrap_or_default();
288        self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
289    }
290
291    /// Specify the limit of the file-embedded metadata cache, in bytes.
292    pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
293        self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
294        self
295    }
296
297    /// Build a RuntimeEnv
298    pub fn build(self) -> Result<RuntimeEnv> {
299        let Self {
300            disk_manager,
301            disk_manager_builder,
302            memory_pool,
303            cache_manager,
304            object_store_registry,
305            #[cfg(feature = "parquet_encryption")]
306            parquet_encryption_factory_registry,
307        } = self;
308        let memory_pool =
309            memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
310
311        Ok(RuntimeEnv {
312            memory_pool,
313            disk_manager: if let Some(builder) = disk_manager_builder {
314                Arc::new(builder.build()?)
315            } else {
316                #[allow(deprecated)]
317                DiskManager::try_new(disk_manager)?
318            },
319            cache_manager: CacheManager::try_new(&cache_manager)?,
320            object_store_registry,
321            #[cfg(feature = "parquet_encryption")]
322            parquet_encryption_factory_registry,
323        })
324    }
325
326    /// Convenience method to create a new `Arc<RuntimeEnv>`
327    pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
328        self.build().map(Arc::new)
329    }
330
331    /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv
332    pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
333        let cache_config = CacheManagerConfig {
334            table_files_statistics_cache: runtime_env
335                .cache_manager
336                .get_file_statistic_cache(),
337            list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
338            file_metadata_cache: Some(
339                runtime_env.cache_manager.get_file_metadata_cache(),
340            ),
341            metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
342        };
343
344        Self {
345            #[allow(deprecated)]
346            disk_manager: DiskManagerConfig::Existing(Arc::clone(
347                &runtime_env.disk_manager,
348            )),
349            disk_manager_builder: None,
350            memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
351            cache_manager: cache_config,
352            object_store_registry: Arc::clone(&runtime_env.object_store_registry),
353            #[cfg(feature = "parquet_encryption")]
354            parquet_encryption_factory_registry: Arc::clone(
355                &runtime_env.parquet_encryption_factory_registry,
356            ),
357        }
358    }
359
360    /// Returns a list of all available runtime configurations with their current values and descriptions
361    pub fn entries(&self) -> Vec<ConfigEntry> {
362        vec![
363            ConfigEntry {
364                key: "datafusion.runtime.memory_limit".to_string(),
365                value: None, // Default is system-dependent
366                description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
367            },
368            ConfigEntry {
369                key: "datafusion.runtime.max_temp_directory_size".to_string(),
370                value: Some("100G".to_string()),
371                description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
372            },
373            ConfigEntry {
374                key: "datafusion.runtime.temp_directory".to_string(),
375                value: None, // Default is system-dependent
376                description: "The path to the temporary file directory.",
377            },
378            ConfigEntry {
379                key: "datafusion.runtime.metadata_cache_limit".to_string(),
380                value: Some("50M".to_owned()),
381                description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
382            }
383        ]
384    }
385
386    /// Generate documentation that can be included in the user guide
387    pub fn generate_config_markdown() -> String {
388        use std::fmt::Write as _;
389
390        let s = Self::default();
391
392        let mut docs = "| key | default | description |\n".to_string();
393        docs += "|-----|---------|-------------|\n";
394        let mut entries = s.entries();
395        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
396
397        for entry in &entries {
398            let _ = writeln!(
399                &mut docs,
400                "| {} | {} | {} |",
401                entry.key,
402                entry.value.as_deref().unwrap_or("NULL"),
403                entry.description
404            );
405        }
406        docs
407    }
408}