datafusion_execution/runtime_env.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution [`RuntimeEnv`] environment that manages access to object
19//! store, memory manager, disk manager.
20
21#[allow(deprecated)]
22use crate::disk_manager::DiskManagerConfig;
23use crate::{
24 disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25 memory_pool::{
26 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27 },
28 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32use datafusion_common::{config::ConfigEntry, Result};
33use object_store::ObjectStore;
34use std::path::PathBuf;
35use std::sync::Arc;
36use std::{
37 fmt::{Debug, Formatter},
38 num::NonZeroUsize,
39};
40use url::Url;
41
42#[derive(Clone)]
43/// Execution runtime environment that manages system resources such
44/// as memory, disk, cache and storage.
45///
46/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
47/// following resource management functionality:
48///
49/// * [`MemoryPool`]: Manage memory
50/// * [`DiskManager`]: Manage temporary files on local disk
51/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
52/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
53///
54/// # Example: Create default `RuntimeEnv`
55/// ```
56/// # use datafusion_execution::runtime_env::RuntimeEnv;
57/// let runtime_env = RuntimeEnv::default();
58/// ```
59///
60/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
61/// ```
62/// # use std::sync::Arc;
63/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
64/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
65/// // restrict to using at most 100MB of memory
66/// let pool_size = 100 * 1024 * 1024;
67/// let runtime_env = RuntimeEnvBuilder::new()
68/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
69/// .build()
70/// .unwrap();
71/// ```
72pub struct RuntimeEnv {
73 /// Runtime memory management
74 pub memory_pool: Arc<dyn MemoryPool>,
75 /// Manage temporary files during query execution
76 pub disk_manager: Arc<DiskManager>,
77 /// Manage temporary cache during query execution
78 pub cache_manager: Arc<CacheManager>,
79 /// Object Store Registry
80 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
81}
82
83impl Debug for RuntimeEnv {
84 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
85 write!(f, "RuntimeEnv")
86 }
87}
88
89impl RuntimeEnv {
90 #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
91 #[allow(deprecated)]
92 pub fn new(config: RuntimeConfig) -> Result<Self> {
93 Self::try_new(config)
94 }
95 /// Create env based on configuration
96 #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
97 #[allow(deprecated)]
98 pub fn try_new(config: RuntimeConfig) -> Result<Self> {
99 config.build()
100 }
101
102 /// Registers a custom `ObjectStore` to be used with a specific url.
103 /// This allows DataFusion to create external tables from urls that do not have
104 /// built in support such as `hdfs://namenode:port/...`.
105 ///
106 /// Returns the [`ObjectStore`] previously registered for this
107 /// scheme, if any.
108 ///
109 /// See [`ObjectStoreRegistry`] for more details
110 ///
111 /// # Example: Register local file system object store
112 /// ```
113 /// # use std::sync::Arc;
114 /// # use url::Url;
115 /// # use datafusion_execution::runtime_env::RuntimeEnv;
116 /// # let runtime_env = RuntimeEnv::default();
117 /// let url = Url::try_from("file://").unwrap();
118 /// let object_store = object_store::local::LocalFileSystem::new();
119 /// // register the object store with the runtime environment
120 /// runtime_env.register_object_store(&url, Arc::new(object_store));
121 /// ```
122 ///
123 /// # Example: Register remote URL object store like [Github](https://github.com)
124 ///
125 ///
126 /// ```
127 /// # use std::sync::Arc;
128 /// # use url::Url;
129 /// # use datafusion_execution::runtime_env::RuntimeEnv;
130 /// # let runtime_env = RuntimeEnv::default();
131 /// # // use local store for example as http feature is not enabled
132 /// # let http_store = object_store::local::LocalFileSystem::new();
133 /// // create a new object store via object_store::http::HttpBuilder;
134 /// let base_url = Url::parse("https://github.com").unwrap();
135 /// // (note this example can't depend on the http feature)
136 /// // let http_store = HttpBuilder::new()
137 /// // .with_url(base_url.clone())
138 /// // .build()
139 /// // .unwrap();
140 /// // register the object store with the runtime environment
141 /// runtime_env.register_object_store(&base_url, Arc::new(http_store));
142 /// ```
143 pub fn register_object_store(
144 &self,
145 url: &Url,
146 object_store: Arc<dyn ObjectStore>,
147 ) -> Option<Arc<dyn ObjectStore>> {
148 self.object_store_registry.register_store(url, object_store)
149 }
150
151 /// Retrieves a `ObjectStore` instance for a url by consulting the
152 /// registry. See [`ObjectStoreRegistry::get_store`] for more
153 /// details.
154 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
155 self.object_store_registry.get_store(url.as_ref())
156 }
157}
158
159impl Default for RuntimeEnv {
160 fn default() -> Self {
161 RuntimeEnvBuilder::new().build().unwrap()
162 }
163}
164
165/// Please see: <https://github.com/apache/datafusion/issues/12156>
166/// This a type alias for backwards compatibility.
167#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
168pub type RuntimeConfig = RuntimeEnvBuilder;
169
170#[derive(Clone)]
171/// Execution runtime configuration builder.
172///
173/// See example on [`RuntimeEnv`]
174pub struct RuntimeEnvBuilder {
175 #[allow(deprecated)]
176 /// DiskManager to manage temporary disk file usage
177 pub disk_manager: DiskManagerConfig,
178 /// DiskManager builder to manager temporary disk file usage
179 pub disk_manager_builder: Option<DiskManagerBuilder>,
180 /// [`MemoryPool`] from which to allocate memory
181 ///
182 /// Defaults to using an [`UnboundedMemoryPool`] if `None`
183 pub memory_pool: Option<Arc<dyn MemoryPool>>,
184 /// CacheManager to manage cache data
185 pub cache_manager: CacheManagerConfig,
186 /// ObjectStoreRegistry to get object store based on url
187 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
188}
189
190impl Default for RuntimeEnvBuilder {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl RuntimeEnvBuilder {
197 /// New with default values
198 pub fn new() -> Self {
199 Self {
200 disk_manager: Default::default(),
201 disk_manager_builder: Default::default(),
202 memory_pool: Default::default(),
203 cache_manager: Default::default(),
204 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
205 }
206 }
207
208 #[allow(deprecated)]
209 #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
210 /// Customize disk manager
211 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
212 self.disk_manager = disk_manager;
213 self
214 }
215
216 /// Customize the disk manager builder
217 pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
218 self.disk_manager_builder = Some(disk_manager);
219 self
220 }
221
222 /// Customize memory policy
223 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
224 self.memory_pool = Some(memory_pool);
225 self
226 }
227
228 /// Customize cache policy
229 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
230 self.cache_manager = cache_manager;
231 self
232 }
233
234 /// Customize object store registry
235 pub fn with_object_store_registry(
236 mut self,
237 object_store_registry: Arc<dyn ObjectStoreRegistry>,
238 ) -> Self {
239 self.object_store_registry = object_store_registry;
240 self
241 }
242
243 /// Specify the total memory to use while running the DataFusion
244 /// plan to `max_memory * memory_fraction` in bytes.
245 ///
246 /// This defaults to using [`GreedyMemoryPool`]
247 ///
248 /// Note DataFusion does not yet respect this limit in all cases.
249 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
250 let pool_size = (max_memory as f64 * memory_fraction) as usize;
251 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
252 GreedyMemoryPool::new(pool_size),
253 NonZeroUsize::new(5).unwrap(),
254 )))
255 }
256
257 /// Use the specified path to create any needed temporary files
258 pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
259 self.with_disk_manager_builder(
260 DiskManagerBuilder::default()
261 .with_mode(DiskManagerMode::Directories(vec![path.into()])),
262 )
263 }
264
265 /// Build a RuntimeEnv
266 pub fn build(self) -> Result<RuntimeEnv> {
267 let Self {
268 disk_manager,
269 disk_manager_builder,
270 memory_pool,
271 cache_manager,
272 object_store_registry,
273 } = self;
274 let memory_pool =
275 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
276
277 Ok(RuntimeEnv {
278 memory_pool,
279 disk_manager: if let Some(builder) = disk_manager_builder {
280 Arc::new(builder.build()?)
281 } else {
282 #[allow(deprecated)]
283 DiskManager::try_new(disk_manager)?
284 },
285 cache_manager: CacheManager::try_new(&cache_manager)?,
286 object_store_registry,
287 })
288 }
289
290 /// Convenience method to create a new `Arc<RuntimeEnv>`
291 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
292 self.build().map(Arc::new)
293 }
294
295 /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv
296 pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
297 let cache_config = CacheManagerConfig {
298 table_files_statistics_cache: runtime_env
299 .cache_manager
300 .get_file_statistic_cache(),
301 list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
302 };
303
304 Self {
305 #[allow(deprecated)]
306 disk_manager: DiskManagerConfig::Existing(Arc::clone(
307 &runtime_env.disk_manager,
308 )),
309 disk_manager_builder: None,
310 memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
311 cache_manager: cache_config,
312 object_store_registry: Arc::clone(&runtime_env.object_store_registry),
313 }
314 }
315
316 /// Returns a list of all available runtime configurations with their current values and descriptions
317 pub fn entries(&self) -> Vec<ConfigEntry> {
318 // Memory pool configuration
319 vec![ConfigEntry {
320 key: "datafusion.runtime.memory_limit".to_string(),
321 value: None, // Default is system-dependent
322 description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
323 }]
324 }
325
326 /// Generate documentation that can be included in the user guide
327 pub fn generate_config_markdown() -> String {
328 use std::fmt::Write as _;
329
330 let s = Self::default();
331
332 let mut docs = "| key | default | description |\n".to_string();
333 docs += "|-----|---------|-------------|\n";
334 let mut entries = s.entries();
335 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
336
337 for entry in &entries {
338 let _ = writeln!(
339 &mut docs,
340 "| {} | {} | {} |",
341 entry.key,
342 entry.value.as_deref().unwrap_or("NULL"),
343 entry.description
344 );
345 }
346 docs
347 }
348}