datafusion_execution/
runtime_env.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution [`RuntimeEnv`] environment that manages access to object
19//! store, memory manager, disk manager.
20
21use crate::{
22    disk_manager::{DiskManager, DiskManagerConfig},
23    memory_pool::{
24        GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
25    },
26    object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
27};
28
29use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
30use datafusion_common::Result;
31use object_store::ObjectStore;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::{
35    fmt::{Debug, Formatter},
36    num::NonZeroUsize,
37};
38use url::Url;
39
40#[derive(Clone)]
41/// Execution runtime environment that manages system resources such
42/// as memory, disk, cache and storage.
43///
44/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
45/// following resource management functionality:
46///
47/// * [`MemoryPool`]: Manage memory
48/// * [`DiskManager`]: Manage temporary files on local disk
49/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
50/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
51///
52/// # Example: Create default `RuntimeEnv`
53/// ```
54/// # use datafusion_execution::runtime_env::RuntimeEnv;
55/// let runtime_env = RuntimeEnv::default();
56/// ```
57///
58/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
59/// ```
60/// # use std::sync::Arc;
61/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
62/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
63/// // restrict to using at most 100MB of memory
64/// let pool_size = 100 * 1024 * 1024;
65/// let runtime_env = RuntimeEnvBuilder::new()
66///   .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
67///   .build()
68///   .unwrap();
69/// ```
70pub struct RuntimeEnv {
71    /// Runtime memory management
72    pub memory_pool: Arc<dyn MemoryPool>,
73    /// Manage temporary files during query execution
74    pub disk_manager: Arc<DiskManager>,
75    /// Manage temporary cache during query execution
76    pub cache_manager: Arc<CacheManager>,
77    /// Object Store Registry
78    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
79}
80
81impl Debug for RuntimeEnv {
82    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
83        write!(f, "RuntimeEnv")
84    }
85}
86
87impl RuntimeEnv {
88    #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
89    #[allow(deprecated)]
90    pub fn new(config: RuntimeConfig) -> Result<Self> {
91        Self::try_new(config)
92    }
93    /// Create env based on configuration
94    #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
95    #[allow(deprecated)]
96    pub fn try_new(config: RuntimeConfig) -> Result<Self> {
97        config.build()
98    }
99
100    /// Registers a custom `ObjectStore` to be used with a specific url.
101    /// This allows DataFusion to create external tables from urls that do not have
102    /// built in support such as `hdfs://namenode:port/...`.
103    ///
104    /// Returns the [`ObjectStore`] previously registered for this
105    /// scheme, if any.
106    ///
107    /// See [`ObjectStoreRegistry`] for more details
108    ///
109    /// # Example: Register local file system object store
110    /// ```
111    /// # use std::sync::Arc;
112    /// # use url::Url;
113    /// # use datafusion_execution::runtime_env::RuntimeEnv;
114    /// # let runtime_env = RuntimeEnv::default();
115    /// let url = Url::try_from("file://").unwrap();
116    /// let object_store = object_store::local::LocalFileSystem::new();
117    /// // register the object store with the runtime environment
118    /// runtime_env.register_object_store(&url, Arc::new(object_store));
119    /// ```
120    ///
121    /// # Example: Register remote URL object store like [Github](https://github.com)
122    ///
123    ///
124    /// ```
125    /// # use std::sync::Arc;
126    /// # use url::Url;
127    /// # use datafusion_execution::runtime_env::RuntimeEnv;
128    /// # let runtime_env = RuntimeEnv::default();
129    /// # // use local store for example as http feature is not enabled
130    /// # let http_store = object_store::local::LocalFileSystem::new();
131    /// // create a new object store via object_store::http::HttpBuilder;
132    /// let base_url = Url::parse("https://github.com").unwrap();
133    /// // (note this example can't depend on the http feature)
134    /// // let http_store = HttpBuilder::new()
135    /// //    .with_url(base_url.clone())
136    /// //    .build()
137    /// //    .unwrap();
138    /// // register the object store with the runtime environment
139    /// runtime_env.register_object_store(&base_url, Arc::new(http_store));
140    /// ```
141    pub fn register_object_store(
142        &self,
143        url: &Url,
144        object_store: Arc<dyn ObjectStore>,
145    ) -> Option<Arc<dyn ObjectStore>> {
146        self.object_store_registry.register_store(url, object_store)
147    }
148
149    /// Retrieves a `ObjectStore` instance for a url by consulting the
150    /// registry. See [`ObjectStoreRegistry::get_store`] for more
151    /// details.
152    pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
153        self.object_store_registry.get_store(url.as_ref())
154    }
155}
156
157impl Default for RuntimeEnv {
158    fn default() -> Self {
159        RuntimeEnvBuilder::new().build().unwrap()
160    }
161}
162
163/// Please see: <https://github.com/apache/datafusion/issues/12156>
164/// This a type alias for backwards compatibility.
165#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
166pub type RuntimeConfig = RuntimeEnvBuilder;
167
168#[derive(Clone)]
169/// Execution runtime configuration builder.
170///
171/// See example on [`RuntimeEnv`]
172pub struct RuntimeEnvBuilder {
173    /// DiskManager to manage temporary disk file usage
174    pub disk_manager: DiskManagerConfig,
175    /// [`MemoryPool`] from which to allocate memory
176    ///
177    /// Defaults to using an [`UnboundedMemoryPool`] if `None`
178    pub memory_pool: Option<Arc<dyn MemoryPool>>,
179    /// CacheManager to manage cache data
180    pub cache_manager: CacheManagerConfig,
181    /// ObjectStoreRegistry to get object store based on url
182    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
183}
184
185impl Default for RuntimeEnvBuilder {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191impl RuntimeEnvBuilder {
192    /// New with default values
193    pub fn new() -> Self {
194        Self {
195            disk_manager: Default::default(),
196            memory_pool: Default::default(),
197            cache_manager: Default::default(),
198            object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
199        }
200    }
201
202    /// Customize disk manager
203    pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
204        self.disk_manager = disk_manager;
205        self
206    }
207
208    /// Customize memory policy
209    pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
210        self.memory_pool = Some(memory_pool);
211        self
212    }
213
214    /// Customize cache policy
215    pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
216        self.cache_manager = cache_manager;
217        self
218    }
219
220    /// Customize object store registry
221    pub fn with_object_store_registry(
222        mut self,
223        object_store_registry: Arc<dyn ObjectStoreRegistry>,
224    ) -> Self {
225        self.object_store_registry = object_store_registry;
226        self
227    }
228
229    /// Specify the total memory to use while running the DataFusion
230    /// plan to `max_memory * memory_fraction` in bytes.
231    ///
232    /// This defaults to using [`GreedyMemoryPool`]
233    ///
234    /// Note DataFusion does not yet respect this limit in all cases.
235    pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
236        let pool_size = (max_memory as f64 * memory_fraction) as usize;
237        self.with_memory_pool(Arc::new(TrackConsumersPool::new(
238            GreedyMemoryPool::new(pool_size),
239            NonZeroUsize::new(5).unwrap(),
240        )))
241    }
242
243    /// Use the specified path to create any needed temporary files
244    pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
245        self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
246    }
247
248    /// Build a RuntimeEnv
249    pub fn build(self) -> Result<RuntimeEnv> {
250        let Self {
251            disk_manager,
252            memory_pool,
253            cache_manager,
254            object_store_registry,
255        } = self;
256        let memory_pool =
257            memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
258
259        Ok(RuntimeEnv {
260            memory_pool,
261            disk_manager: DiskManager::try_new(disk_manager)?,
262            cache_manager: CacheManager::try_new(&cache_manager)?,
263            object_store_registry,
264        })
265    }
266
267    /// Convenience method to create a new `Arc<RuntimeEnv>`
268    pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
269        self.build().map(Arc::new)
270    }
271}