datafusion_distributed/networking/
worker_resolver.rs1use crate::DistributedConfig;
2use crate::config_extension_ext::set_distributed_option_extension;
3use datafusion::common::{DataFusionError, exec_err, not_impl_err};
4use datafusion::prelude::SessionConfig;
5use std::sync::Arc;
6use url::Url;
7
8pub trait WorkerResolver {
10 fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
21}
22
23pub(crate) fn set_distributed_worker_resolver(
24 cfg: &mut SessionConfig,
25 worker_resolver: impl WorkerResolver + Send + Sync + 'static,
26) {
27 let opts = cfg.options_mut();
28 let worker_resolver = WorkerResolverExtension(Arc::new(worker_resolver));
29 if let Some(distributed_cfg) = opts.extensions.get_mut::<DistributedConfig>() {
30 distributed_cfg.__private_worker_resolver = worker_resolver;
31 } else {
32 set_distributed_option_extension(
33 cfg,
34 DistributedConfig {
35 __private_worker_resolver: worker_resolver,
36 ..Default::default()
37 },
38 )
39 }
40}
41
42pub fn get_distributed_worker_resolver(
43 cfg: &SessionConfig,
44) -> Result<Arc<dyn WorkerResolver + Send + Sync>, DataFusionError> {
45 let opts = cfg.options();
46 let Some(distributed_cfg) = opts.extensions.get::<DistributedConfig>() else {
47 return exec_err!("WorkerResolver not present in the session config");
48 };
49 Ok(Arc::clone(&distributed_cfg.__private_worker_resolver.0))
50}
51
52#[derive(Clone)]
53pub(crate) struct WorkerResolverExtension(
54 pub(crate) Arc<dyn WorkerResolver + Send + Sync + 'static>,
55);
56
57impl WorkerResolverExtension {
58 pub(crate) fn not_implemented() -> Self {
59 struct NotImplementedWorkerResolver;
60 impl WorkerResolver for NotImplementedWorkerResolver {
61 fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
62 not_impl_err!("WorkerResolver::get_urls() not implemented")
63 }
64 }
65 Self(Arc::new(NotImplementedWorkerResolver))
66 }
67}
68
69impl WorkerResolver for Arc<dyn WorkerResolver + Send + Sync> {
70 fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
71 self.as_ref().get_urls()
72 }
73}