1#[allow(deprecated)]
22use crate::disk_manager::DiskManagerConfig;
23use crate::{
24 disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25 memory_pool::{
26 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27 },
28 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{config::ConfigEntry, Result};
35use object_store::ObjectStore;
36use std::path::PathBuf;
37use std::sync::Arc;
38use std::{
39 fmt::{Debug, Formatter},
40 num::NonZeroUsize,
41};
42use url::Url;
43
44#[derive(Clone)]
45pub struct RuntimeEnv {
75 pub memory_pool: Arc<dyn MemoryPool>,
77 pub disk_manager: Arc<DiskManager>,
79 pub cache_manager: Arc<CacheManager>,
81 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83 #[cfg(feature = "parquet_encryption")]
85 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90 write!(f, "RuntimeEnv")
91 }
92}
93
94impl RuntimeEnv {
95 pub fn register_object_store(
137 &self,
138 url: &Url,
139 object_store: Arc<dyn ObjectStore>,
140 ) -> Option<Arc<dyn ObjectStore>> {
141 self.object_store_registry.register_store(url, object_store)
142 }
143
144 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
148 self.object_store_registry.get_store(url.as_ref())
149 }
150
151 #[cfg(feature = "parquet_encryption")]
155 pub fn register_parquet_encryption_factory(
156 &self,
157 id: &str,
158 encryption_factory: Arc<dyn EncryptionFactory>,
159 ) -> Option<Arc<dyn EncryptionFactory>> {
160 self.parquet_encryption_factory_registry
161 .register_factory(id, encryption_factory)
162 }
163
164 #[cfg(feature = "parquet_encryption")]
166 pub fn parquet_encryption_factory(
167 &self,
168 id: &str,
169 ) -> Result<Arc<dyn EncryptionFactory>> {
170 self.parquet_encryption_factory_registry.get_factory(id)
171 }
172}
173
174impl Default for RuntimeEnv {
175 fn default() -> Self {
176 RuntimeEnvBuilder::new().build().unwrap()
177 }
178}
179
180#[derive(Clone)]
184pub struct RuntimeEnvBuilder {
185 #[allow(deprecated)]
186 pub disk_manager: DiskManagerConfig,
188 pub disk_manager_builder: Option<DiskManagerBuilder>,
190 pub memory_pool: Option<Arc<dyn MemoryPool>>,
194 pub cache_manager: CacheManagerConfig,
196 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
198 #[cfg(feature = "parquet_encryption")]
200 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
201}
202
203impl Default for RuntimeEnvBuilder {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209impl RuntimeEnvBuilder {
210 pub fn new() -> Self {
212 Self {
213 disk_manager: Default::default(),
214 disk_manager_builder: Default::default(),
215 memory_pool: Default::default(),
216 cache_manager: Default::default(),
217 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
218 #[cfg(feature = "parquet_encryption")]
219 parquet_encryption_factory_registry: Default::default(),
220 }
221 }
222
223 #[allow(deprecated)]
224 #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
225 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
227 self.disk_manager = disk_manager;
228 self
229 }
230
231 pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
233 self.disk_manager_builder = Some(disk_manager);
234 self
235 }
236
237 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
239 self.memory_pool = Some(memory_pool);
240 self
241 }
242
243 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
245 self.cache_manager = cache_manager;
246 self
247 }
248
249 pub fn with_object_store_registry(
251 mut self,
252 object_store_registry: Arc<dyn ObjectStoreRegistry>,
253 ) -> Self {
254 self.object_store_registry = object_store_registry;
255 self
256 }
257
258 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
266 let pool_size = (max_memory as f64 * memory_fraction) as usize;
267 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
268 GreedyMemoryPool::new(pool_size),
269 NonZeroUsize::new(5).unwrap(),
270 )))
271 }
272
273 pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
275 let builder = self.disk_manager_builder.take().unwrap_or_default();
276 self.with_disk_manager_builder(
277 builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
278 )
279 }
280
281 pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
283 let builder = self.disk_manager_builder.take().unwrap_or_default();
284 self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
285 }
286
287 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
289 self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
290 self
291 }
292
293 pub fn build(self) -> Result<RuntimeEnv> {
295 let Self {
296 disk_manager,
297 disk_manager_builder,
298 memory_pool,
299 cache_manager,
300 object_store_registry,
301 #[cfg(feature = "parquet_encryption")]
302 parquet_encryption_factory_registry,
303 } = self;
304 let memory_pool =
305 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
306
307 Ok(RuntimeEnv {
308 memory_pool,
309 disk_manager: if let Some(builder) = disk_manager_builder {
310 Arc::new(builder.build()?)
311 } else {
312 #[allow(deprecated)]
313 DiskManager::try_new(disk_manager)?
314 },
315 cache_manager: CacheManager::try_new(&cache_manager)?,
316 object_store_registry,
317 #[cfg(feature = "parquet_encryption")]
318 parquet_encryption_factory_registry,
319 })
320 }
321
322 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
324 self.build().map(Arc::new)
325 }
326
327 pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
329 let cache_config = CacheManagerConfig {
330 table_files_statistics_cache: runtime_env
331 .cache_manager
332 .get_file_statistic_cache(),
333 list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
334 file_metadata_cache: Some(
335 runtime_env.cache_manager.get_file_metadata_cache(),
336 ),
337 metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
338 };
339
340 Self {
341 #[allow(deprecated)]
342 disk_manager: DiskManagerConfig::Existing(Arc::clone(
343 &runtime_env.disk_manager,
344 )),
345 disk_manager_builder: None,
346 memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
347 cache_manager: cache_config,
348 object_store_registry: Arc::clone(&runtime_env.object_store_registry),
349 #[cfg(feature = "parquet_encryption")]
350 parquet_encryption_factory_registry: Arc::clone(
351 &runtime_env.parquet_encryption_factory_registry,
352 ),
353 }
354 }
355
356 pub fn entries(&self) -> Vec<ConfigEntry> {
358 vec![
359 ConfigEntry {
360 key: "datafusion.runtime.memory_limit".to_string(),
361 value: None, description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
363 },
364 ConfigEntry {
365 key: "datafusion.runtime.max_temp_directory_size".to_string(),
366 value: Some("100G".to_string()),
367 description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
368 },
369 ConfigEntry {
370 key: "datafusion.runtime.temp_directory".to_string(),
371 value: None, description: "The path to the temporary file directory.",
373 },
374 ConfigEntry {
375 key: "datafusion.runtime.metadata_cache_limit".to_string(),
376 value: Some("50M".to_owned()),
377 description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
378 }
379 ]
380 }
381
382 pub fn generate_config_markdown() -> String {
384 use std::fmt::Write as _;
385
386 let s = Self::default();
387
388 let mut docs = "| key | default | description |\n".to_string();
389 docs += "|-----|---------|-------------|\n";
390 let mut entries = s.entries();
391 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
392
393 for entry in &entries {
394 let _ = writeln!(
395 &mut docs,
396 "| {} | {} | {} |",
397 entry.key,
398 entry.value.as_deref().unwrap_or("NULL"),
399 entry.description
400 );
401 }
402 docs
403 }
404}