datafusion_execution/
runtime_env.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution [`RuntimeEnv`] environment that manages access to object
19//! store, memory manager, disk manager.
20
21#[allow(deprecated)]
22use crate::disk_manager::DiskManagerConfig;
23use crate::{
24    disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25    memory_pool::{
26        GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27    },
28    object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32use datafusion_common::{config::ConfigEntry, Result};
33use object_store::ObjectStore;
34use std::path::PathBuf;
35use std::sync::Arc;
36use std::{
37    fmt::{Debug, Formatter},
38    num::NonZeroUsize,
39};
40use url::Url;
41
42#[derive(Clone)]
43/// Execution runtime environment that manages system resources such
44/// as memory, disk, cache and storage.
45///
46/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
47/// following resource management functionality:
48///
49/// * [`MemoryPool`]: Manage memory
50/// * [`DiskManager`]: Manage temporary files on local disk
51/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
52/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
53///
54/// # Example: Create default `RuntimeEnv`
55/// ```
56/// # use datafusion_execution::runtime_env::RuntimeEnv;
57/// let runtime_env = RuntimeEnv::default();
58/// ```
59///
60/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
61/// ```
62/// # use std::sync::Arc;
63/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
64/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
65/// // restrict to using at most 100MB of memory
66/// let pool_size = 100 * 1024 * 1024;
67/// let runtime_env = RuntimeEnvBuilder::new()
68///   .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
69///   .build()
70///   .unwrap();
71/// ```
72pub struct RuntimeEnv {
73    /// Runtime memory management
74    pub memory_pool: Arc<dyn MemoryPool>,
75    /// Manage temporary files during query execution
76    pub disk_manager: Arc<DiskManager>,
77    /// Manage temporary cache during query execution
78    pub cache_manager: Arc<CacheManager>,
79    /// Object Store Registry
80    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
81}
82
83impl Debug for RuntimeEnv {
84    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
85        write!(f, "RuntimeEnv")
86    }
87}
88
89impl RuntimeEnv {
90    #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
91    #[allow(deprecated)]
92    pub fn new(config: RuntimeConfig) -> Result<Self> {
93        Self::try_new(config)
94    }
95    /// Create env based on configuration
96    #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
97    #[allow(deprecated)]
98    pub fn try_new(config: RuntimeConfig) -> Result<Self> {
99        config.build()
100    }
101
102    /// Registers a custom `ObjectStore` to be used with a specific url.
103    /// This allows DataFusion to create external tables from urls that do not have
104    /// built in support such as `hdfs://namenode:port/...`.
105    ///
106    /// Returns the [`ObjectStore`] previously registered for this
107    /// scheme, if any.
108    ///
109    /// See [`ObjectStoreRegistry`] for more details
110    ///
111    /// # Example: Register local file system object store
112    /// ```
113    /// # use std::sync::Arc;
114    /// # use url::Url;
115    /// # use datafusion_execution::runtime_env::RuntimeEnv;
116    /// # let runtime_env = RuntimeEnv::default();
117    /// let url = Url::try_from("file://").unwrap();
118    /// let object_store = object_store::local::LocalFileSystem::new();
119    /// // register the object store with the runtime environment
120    /// runtime_env.register_object_store(&url, Arc::new(object_store));
121    /// ```
122    ///
123    /// # Example: Register remote URL object store like [Github](https://github.com)
124    ///
125    ///
126    /// ```
127    /// # use std::sync::Arc;
128    /// # use url::Url;
129    /// # use datafusion_execution::runtime_env::RuntimeEnv;
130    /// # let runtime_env = RuntimeEnv::default();
131    /// # // use local store for example as http feature is not enabled
132    /// # let http_store = object_store::local::LocalFileSystem::new();
133    /// // create a new object store via object_store::http::HttpBuilder;
134    /// let base_url = Url::parse("https://github.com").unwrap();
135    /// // (note this example can't depend on the http feature)
136    /// // let http_store = HttpBuilder::new()
137    /// //    .with_url(base_url.clone())
138    /// //    .build()
139    /// //    .unwrap();
140    /// // register the object store with the runtime environment
141    /// runtime_env.register_object_store(&base_url, Arc::new(http_store));
142    /// ```
143    pub fn register_object_store(
144        &self,
145        url: &Url,
146        object_store: Arc<dyn ObjectStore>,
147    ) -> Option<Arc<dyn ObjectStore>> {
148        self.object_store_registry.register_store(url, object_store)
149    }
150
151    /// Retrieves a `ObjectStore` instance for a url by consulting the
152    /// registry. See [`ObjectStoreRegistry::get_store`] for more
153    /// details.
154    pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
155        self.object_store_registry.get_store(url.as_ref())
156    }
157}
158
159impl Default for RuntimeEnv {
160    fn default() -> Self {
161        RuntimeEnvBuilder::new().build().unwrap()
162    }
163}
164
165/// Please see: <https://github.com/apache/datafusion/issues/12156>
166/// This a type alias for backwards compatibility.
167#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
168pub type RuntimeConfig = RuntimeEnvBuilder;
169
170#[derive(Clone)]
171/// Execution runtime configuration builder.
172///
173/// See example on [`RuntimeEnv`]
174pub struct RuntimeEnvBuilder {
175    #[allow(deprecated)]
176    /// DiskManager to manage temporary disk file usage
177    pub disk_manager: DiskManagerConfig,
178    /// DiskManager builder to manager temporary disk file usage
179    pub disk_manager_builder: Option<DiskManagerBuilder>,
180    /// [`MemoryPool`] from which to allocate memory
181    ///
182    /// Defaults to using an [`UnboundedMemoryPool`] if `None`
183    pub memory_pool: Option<Arc<dyn MemoryPool>>,
184    /// CacheManager to manage cache data
185    pub cache_manager: CacheManagerConfig,
186    /// ObjectStoreRegistry to get object store based on url
187    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
188}
189
190impl Default for RuntimeEnvBuilder {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl RuntimeEnvBuilder {
197    /// New with default values
198    pub fn new() -> Self {
199        Self {
200            disk_manager: Default::default(),
201            disk_manager_builder: Default::default(),
202            memory_pool: Default::default(),
203            cache_manager: Default::default(),
204            object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
205        }
206    }
207
208    #[allow(deprecated)]
209    #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
210    /// Customize disk manager
211    pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
212        self.disk_manager = disk_manager;
213        self
214    }
215
216    /// Customize the disk manager builder
217    pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
218        self.disk_manager_builder = Some(disk_manager);
219        self
220    }
221
222    /// Customize memory policy
223    pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
224        self.memory_pool = Some(memory_pool);
225        self
226    }
227
228    /// Customize cache policy
229    pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
230        self.cache_manager = cache_manager;
231        self
232    }
233
234    /// Customize object store registry
235    pub fn with_object_store_registry(
236        mut self,
237        object_store_registry: Arc<dyn ObjectStoreRegistry>,
238    ) -> Self {
239        self.object_store_registry = object_store_registry;
240        self
241    }
242
243    /// Specify the total memory to use while running the DataFusion
244    /// plan to `max_memory * memory_fraction` in bytes.
245    ///
246    /// This defaults to using [`GreedyMemoryPool`]
247    ///
248    /// Note DataFusion does not yet respect this limit in all cases.
249    pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
250        let pool_size = (max_memory as f64 * memory_fraction) as usize;
251        self.with_memory_pool(Arc::new(TrackConsumersPool::new(
252            GreedyMemoryPool::new(pool_size),
253            NonZeroUsize::new(5).unwrap(),
254        )))
255    }
256
257    /// Use the specified path to create any needed temporary files
258    pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
259        self.with_disk_manager_builder(
260            DiskManagerBuilder::default()
261                .with_mode(DiskManagerMode::Directories(vec![path.into()])),
262        )
263    }
264
265    /// Build a RuntimeEnv
266    pub fn build(self) -> Result<RuntimeEnv> {
267        let Self {
268            disk_manager,
269            disk_manager_builder,
270            memory_pool,
271            cache_manager,
272            object_store_registry,
273        } = self;
274        let memory_pool =
275            memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
276
277        Ok(RuntimeEnv {
278            memory_pool,
279            disk_manager: if let Some(builder) = disk_manager_builder {
280                Arc::new(builder.build()?)
281            } else {
282                #[allow(deprecated)]
283                DiskManager::try_new(disk_manager)?
284            },
285            cache_manager: CacheManager::try_new(&cache_manager)?,
286            object_store_registry,
287        })
288    }
289
290    /// Convenience method to create a new `Arc<RuntimeEnv>`
291    pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
292        self.build().map(Arc::new)
293    }
294
295    /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv
296    pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
297        let cache_config = CacheManagerConfig {
298            table_files_statistics_cache: runtime_env
299                .cache_manager
300                .get_file_statistic_cache(),
301            list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
302        };
303
304        Self {
305            #[allow(deprecated)]
306            disk_manager: DiskManagerConfig::Existing(Arc::clone(
307                &runtime_env.disk_manager,
308            )),
309            disk_manager_builder: None,
310            memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
311            cache_manager: cache_config,
312            object_store_registry: Arc::clone(&runtime_env.object_store_registry),
313        }
314    }
315
316    /// Returns a list of all available runtime configurations with their current values and descriptions
317    pub fn entries(&self) -> Vec<ConfigEntry> {
318        // Memory pool configuration
319        vec![ConfigEntry {
320            key: "datafusion.runtime.memory_limit".to_string(),
321            value: None, // Default is system-dependent
322            description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
323        }]
324    }
325
326    /// Generate documentation that can be included in the user guide
327    pub fn generate_config_markdown() -> String {
328        use std::fmt::Write as _;
329
330        let s = Self::default();
331
332        let mut docs = "| key | default | description |\n".to_string();
333        docs += "|-----|---------|-------------|\n";
334        let mut entries = s.entries();
335        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
336
337        for entry in &entries {
338            let _ = writeln!(
339                &mut docs,
340                "| {} | {} | {} |",
341                entry.key,
342                entry.value.as_deref().unwrap_or("NULL"),
343                entry.description
344            );
345        }
346        docs
347    }
348}