1#[allow(deprecated)]
22use crate::disk_manager::DiskManagerConfig;
23use crate::{
24 disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
25 memory_pool::{
26 GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
27 },
28 object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
29};
30
31use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
32#[cfg(feature = "parquet_encryption")]
33use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry};
34use datafusion_common::{config::ConfigEntry, Result};
35use object_store::ObjectStore;
36use std::path::PathBuf;
37use std::sync::Arc;
38use std::{
39 fmt::{Debug, Formatter},
40 num::NonZeroUsize,
41};
42use url::Url;
43
44#[derive(Clone)]
45pub struct RuntimeEnv {
75 pub memory_pool: Arc<dyn MemoryPool>,
77 pub disk_manager: Arc<DiskManager>,
79 pub cache_manager: Arc<CacheManager>,
81 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
83 #[cfg(feature = "parquet_encryption")]
85 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
86}
87
88impl Debug for RuntimeEnv {
89 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
90 write!(f, "RuntimeEnv")
91 }
92}
93
94impl RuntimeEnv {
95 pub fn register_object_store(
135 &self,
136 url: &Url,
137 object_store: Arc<dyn ObjectStore>,
138 ) -> Option<Arc<dyn ObjectStore>> {
139 self.object_store_registry.register_store(url, object_store)
140 }
141
142 pub fn deregister_object_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
145 self.object_store_registry.deregister_store(url)
146 }
147
148 pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
152 self.object_store_registry.get_store(url.as_ref())
153 }
154
155 #[cfg(feature = "parquet_encryption")]
159 pub fn register_parquet_encryption_factory(
160 &self,
161 id: &str,
162 encryption_factory: Arc<dyn EncryptionFactory>,
163 ) -> Option<Arc<dyn EncryptionFactory>> {
164 self.parquet_encryption_factory_registry
165 .register_factory(id, encryption_factory)
166 }
167
168 #[cfg(feature = "parquet_encryption")]
170 pub fn parquet_encryption_factory(
171 &self,
172 id: &str,
173 ) -> Result<Arc<dyn EncryptionFactory>> {
174 self.parquet_encryption_factory_registry.get_factory(id)
175 }
176}
177
178impl Default for RuntimeEnv {
179 fn default() -> Self {
180 RuntimeEnvBuilder::new().build().unwrap()
181 }
182}
183
184#[derive(Clone)]
188pub struct RuntimeEnvBuilder {
189 #[allow(deprecated)]
190 pub disk_manager: DiskManagerConfig,
192 pub disk_manager_builder: Option<DiskManagerBuilder>,
194 pub memory_pool: Option<Arc<dyn MemoryPool>>,
198 pub cache_manager: CacheManagerConfig,
200 pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
202 #[cfg(feature = "parquet_encryption")]
204 pub parquet_encryption_factory_registry: Arc<EncryptionFactoryRegistry>,
205}
206
207impl Default for RuntimeEnvBuilder {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213impl RuntimeEnvBuilder {
214 pub fn new() -> Self {
216 Self {
217 disk_manager: Default::default(),
218 disk_manager_builder: Default::default(),
219 memory_pool: Default::default(),
220 cache_manager: Default::default(),
221 object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
222 #[cfg(feature = "parquet_encryption")]
223 parquet_encryption_factory_registry: Default::default(),
224 }
225 }
226
227 #[allow(deprecated)]
228 #[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
229 pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
231 self.disk_manager = disk_manager;
232 self
233 }
234
235 pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
237 self.disk_manager_builder = Some(disk_manager);
238 self
239 }
240
241 pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
243 self.memory_pool = Some(memory_pool);
244 self
245 }
246
247 pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
249 self.cache_manager = cache_manager;
250 self
251 }
252
253 pub fn with_object_store_registry(
255 mut self,
256 object_store_registry: Arc<dyn ObjectStoreRegistry>,
257 ) -> Self {
258 self.object_store_registry = object_store_registry;
259 self
260 }
261
262 pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
270 let pool_size = (max_memory as f64 * memory_fraction) as usize;
271 self.with_memory_pool(Arc::new(TrackConsumersPool::new(
272 GreedyMemoryPool::new(pool_size),
273 NonZeroUsize::new(5).unwrap(),
274 )))
275 }
276
277 pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
279 let builder = self.disk_manager_builder.take().unwrap_or_default();
280 self.with_disk_manager_builder(
281 builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
282 )
283 }
284
285 pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
287 let builder = self.disk_manager_builder.take().unwrap_or_default();
288 self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
289 }
290
291 pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
293 self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
294 self
295 }
296
297 pub fn build(self) -> Result<RuntimeEnv> {
299 let Self {
300 disk_manager,
301 disk_manager_builder,
302 memory_pool,
303 cache_manager,
304 object_store_registry,
305 #[cfg(feature = "parquet_encryption")]
306 parquet_encryption_factory_registry,
307 } = self;
308 let memory_pool =
309 memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
310
311 Ok(RuntimeEnv {
312 memory_pool,
313 disk_manager: if let Some(builder) = disk_manager_builder {
314 Arc::new(builder.build()?)
315 } else {
316 #[allow(deprecated)]
317 DiskManager::try_new(disk_manager)?
318 },
319 cache_manager: CacheManager::try_new(&cache_manager)?,
320 object_store_registry,
321 #[cfg(feature = "parquet_encryption")]
322 parquet_encryption_factory_registry,
323 })
324 }
325
326 pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
328 self.build().map(Arc::new)
329 }
330
331 pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
333 let cache_config = CacheManagerConfig {
334 table_files_statistics_cache: runtime_env
335 .cache_manager
336 .get_file_statistic_cache(),
337 list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
338 file_metadata_cache: Some(
339 runtime_env.cache_manager.get_file_metadata_cache(),
340 ),
341 metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(),
342 };
343
344 Self {
345 #[allow(deprecated)]
346 disk_manager: DiskManagerConfig::Existing(Arc::clone(
347 &runtime_env.disk_manager,
348 )),
349 disk_manager_builder: None,
350 memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
351 cache_manager: cache_config,
352 object_store_registry: Arc::clone(&runtime_env.object_store_registry),
353 #[cfg(feature = "parquet_encryption")]
354 parquet_encryption_factory_registry: Arc::clone(
355 &runtime_env.parquet_encryption_factory_registry,
356 ),
357 }
358 }
359
360 pub fn entries(&self) -> Vec<ConfigEntry> {
362 vec![
363 ConfigEntry {
364 key: "datafusion.runtime.memory_limit".to_string(),
365 value: None, description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
367 },
368 ConfigEntry {
369 key: "datafusion.runtime.max_temp_directory_size".to_string(),
370 value: Some("100G".to_string()),
371 description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
372 },
373 ConfigEntry {
374 key: "datafusion.runtime.temp_directory".to_string(),
375 value: None, description: "The path to the temporary file directory.",
377 },
378 ConfigEntry {
379 key: "datafusion.runtime.metadata_cache_limit".to_string(),
380 value: Some("50M".to_owned()),
381 description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
382 }
383 ]
384 }
385
386 pub fn generate_config_markdown() -> String {
388 use std::fmt::Write as _;
389
390 let s = Self::default();
391
392 let mut docs = "| key | default | description |\n".to_string();
393 docs += "|-----|---------|-------------|\n";
394 let mut entries = s.entries();
395 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
396
397 for entry in &entries {
398 let _ = writeln!(
399 &mut docs,
400 "| {} | {} | {} |",
401 entry.key,
402 entry.value.as_deref().unwrap_or("NULL"),
403 entry.description
404 );
405 }
406 docs
407 }
408}