datafusion_execution/runtime_env.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution [`RuntimeEnv`] environment that manages access to object
19//! store, memory manager, disk manager.
20
21use crate::{
22 disk_manager::{DiskManager, DiskManagerConfig},
23 memory_pool::{
24 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
25 },
26 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
27};
28
29use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
30use datafusion_common::Result;
31use object_store::ObjectStore;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::{
35 fmt::{Debug, Formatter},
36 num::NonZeroUsize,
37};
38use url::Url;
39
40#[derive(Clone)]
41/// Execution runtime environment that manages system resources such
42/// as memory, disk, cache and storage.
43///
44/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
45/// following resource management functionality:
46///
47/// * [`MemoryPool`]: Manage memory
48/// * [`DiskManager`]: Manage temporary files on local disk
49/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
50/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
51///
52/// # Example: Create default `RuntimeEnv`
53/// ```
54/// # use datafusion_execution::runtime_env::RuntimeEnv;
55/// let runtime_env = RuntimeEnv::default();
56/// ```
57///
58/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
59/// ```
60/// # use std::sync::Arc;
61/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
62/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
63/// // restrict to using at most 100MB of memory
64/// let pool_size = 100 * 1024 * 1024;
65/// let runtime_env = RuntimeEnvBuilder::new()
66/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
67/// .build()
68/// .unwrap();
69/// ```
70pub struct RuntimeEnv {
71 /// Runtime memory management
72 pub memory_pool: Arc<dyn MemoryPool>,
73 /// Manage temporary files during query execution
74 pub disk_manager: Arc<DiskManager>,
75 /// Manage temporary cache during query execution
76 pub cache_manager: Arc<CacheManager>,
77 /// Object Store Registry
78 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
79}
80
81impl Debug for RuntimeEnv {
82 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
83 write!(f, "RuntimeEnv")
84 }
85}
86
87impl RuntimeEnv {
88 #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
89 #[allow(deprecated)]
90 pub fn new(config: RuntimeConfig) -> Result<Self> {
91 Self::try_new(config)
92 }
93 /// Create env based on configuration
94 #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
95 #[allow(deprecated)]
96 pub fn try_new(config: RuntimeConfig) -> Result<Self> {
97 config.build()
98 }
99
100 /// Registers a custom `ObjectStore` to be used with a specific url.
101 /// This allows DataFusion to create external tables from urls that do not have
102 /// built in support such as `hdfs://namenode:port/...`.
103 ///
104 /// Returns the [`ObjectStore`] previously registered for this
105 /// scheme, if any.
106 ///
107 /// See [`ObjectStoreRegistry`] for more details
108 ///
109 /// # Example: Register local file system object store
110 /// ```
111 /// # use std::sync::Arc;
112 /// # use url::Url;
113 /// # use datafusion_execution::runtime_env::RuntimeEnv;
114 /// # let runtime_env = RuntimeEnv::default();
115 /// let url = Url::try_from("file://").unwrap();
116 /// let object_store = object_store::local::LocalFileSystem::new();
117 /// // register the object store with the runtime environment
118 /// runtime_env.register_object_store(&url, Arc::new(object_store));
119 /// ```
120 ///
121 /// # Example: Register remote URL object store like [Github](https://github.com)
122 ///
123 ///
124 /// ```
125 /// # use std::sync::Arc;
126 /// # use url::Url;
127 /// # use datafusion_execution::runtime_env::RuntimeEnv;
128 /// # let runtime_env = RuntimeEnv::default();
129 /// # // use local store for example as http feature is not enabled
130 /// # let http_store = object_store::local::LocalFileSystem::new();
131 /// // create a new object store via object_store::http::HttpBuilder;
132 /// let base_url = Url::parse("https://github.com").unwrap();
133 /// // (note this example can't depend on the http feature)
134 /// // let http_store = HttpBuilder::new()
135 /// // .with_url(base_url.clone())
136 /// // .build()
137 /// // .unwrap();
138 /// // register the object store with the runtime environment
139 /// runtime_env.register_object_store(&base_url, Arc::new(http_store));
140 /// ```
141 pub fn register_object_store(
142 &self,
143 url: &Url,
144 object_store: Arc<dyn ObjectStore>,
145 ) -> Option<Arc<dyn ObjectStore>> {
146 self.object_store_registry.register_store(url, object_store)
147 }
148
149 /// Retrieves a `ObjectStore` instance for a url by consulting the
150 /// registry. See [`ObjectStoreRegistry::get_store`] for more
151 /// details.
152 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
153 self.object_store_registry.get_store(url.as_ref())
154 }
155}
156
157impl Default for RuntimeEnv {
158 fn default() -> Self {
159 RuntimeEnvBuilder::new().build().unwrap()
160 }
161}
162
163/// Please see: <https://github.com/apache/datafusion/issues/12156>
164/// This a type alias for backwards compatibility.
165#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
166pub type RuntimeConfig = RuntimeEnvBuilder;
167
168#[derive(Clone)]
169/// Execution runtime configuration builder.
170///
171/// See example on [`RuntimeEnv`]
172pub struct RuntimeEnvBuilder {
173 /// DiskManager to manage temporary disk file usage
174 pub disk_manager: DiskManagerConfig,
175 /// [`MemoryPool`] from which to allocate memory
176 ///
177 /// Defaults to using an [`UnboundedMemoryPool`] if `None`
178 pub memory_pool: Option<Arc<dyn MemoryPool>>,
179 /// CacheManager to manage cache data
180 pub cache_manager: CacheManagerConfig,
181 /// ObjectStoreRegistry to get object store based on url
182 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
183}
184
185impl Default for RuntimeEnvBuilder {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191impl RuntimeEnvBuilder {
192 /// New with default values
193 pub fn new() -> Self {
194 Self {
195 disk_manager: Default::default(),
196 memory_pool: Default::default(),
197 cache_manager: Default::default(),
198 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
199 }
200 }
201
202 /// Customize disk manager
203 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
204 self.disk_manager = disk_manager;
205 self
206 }
207
208 /// Customize memory policy
209 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
210 self.memory_pool = Some(memory_pool);
211 self
212 }
213
214 /// Customize cache policy
215 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
216 self.cache_manager = cache_manager;
217 self
218 }
219
220 /// Customize object store registry
221 pub fn with_object_store_registry(
222 mut self,
223 object_store_registry: Arc<dyn ObjectStoreRegistry>,
224 ) -> Self {
225 self.object_store_registry = object_store_registry;
226 self
227 }
228
229 /// Specify the total memory to use while running the DataFusion
230 /// plan to `max_memory * memory_fraction` in bytes.
231 ///
232 /// This defaults to using [`GreedyMemoryPool`]
233 ///
234 /// Note DataFusion does not yet respect this limit in all cases.
235 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
236 let pool_size = (max_memory as f64 * memory_fraction) as usize;
237 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
238 GreedyMemoryPool::new(pool_size),
239 NonZeroUsize::new(5).unwrap(),
240 )))
241 }
242
243 /// Use the specified path to create any needed temporary files
244 pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
245 self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
246 }
247
248 /// Build a RuntimeEnv
249 pub fn build(self) -> Result<RuntimeEnv> {
250 let Self {
251 disk_manager,
252 memory_pool,
253 cache_manager,
254 object_store_registry,
255 } = self;
256 let memory_pool =
257 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
258
259 Ok(RuntimeEnv {
260 memory_pool,
261 disk_manager: DiskManager::try_new(disk_manager)?,
262 cache_manager: CacheManager::try_new(&cache_manager)?,
263 object_store_registry,
264 })
265 }
266
267 /// Convenience method to create a new `Arc<RuntimeEnv>`
268 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
269 self.build().map(Arc::new)
270 }
271}