Skip to main content

datafusion_distributed/networking/
worker_resolver.rs

1use crate::DistributedConfig;
2use crate::config_extension_ext::set_distributed_option_extension;
3use datafusion::common::{DataFusionError, exec_err, not_impl_err};
4use datafusion::prelude::SessionConfig;
5use std::sync::Arc;
6use url::Url;
7
8/// Resolves a list of worker URLs in the cluster available for executing parts of the plan.
9pub trait WorkerResolver {
10    /// Gets all available worker URLs in the cluster. Note how this method is not async, which
11    /// means that any async operation involved in discovering worker URLs must happen on a
12    /// background thread and be retrieved by this method synchronously.
13    ///
14    /// This method will be called in several places during distributed planning:
15    /// - During task count assignation for the different stages, for determining the size of
16    ///   the cluster and limiting the amount of tasks per stage to Vec<Url>.length().
17    /// - Right before execution, for lazily assigning worker URLs to the different tasks in the
18    ///   plan. This is done as close to execution in order to have fresh worker URLs as updated
19    ///   as possible.
20    fn get_urls(&self) -> Result<Vec<Url>, DataFusionError>;
21}
22
23pub(crate) fn set_distributed_worker_resolver(
24    cfg: &mut SessionConfig,
25    worker_resolver: impl WorkerResolver + Send + Sync + 'static,
26) {
27    let opts = cfg.options_mut();
28    let worker_resolver = WorkerResolverExtension(Arc::new(worker_resolver));
29    if let Some(distributed_cfg) = opts.extensions.get_mut::<DistributedConfig>() {
30        distributed_cfg.__private_worker_resolver = worker_resolver;
31    } else {
32        set_distributed_option_extension(
33            cfg,
34            DistributedConfig {
35                __private_worker_resolver: worker_resolver,
36                ..Default::default()
37            },
38        )
39    }
40}
41
42pub fn get_distributed_worker_resolver(
43    cfg: &SessionConfig,
44) -> Result<Arc<dyn WorkerResolver + Send + Sync>, DataFusionError> {
45    let opts = cfg.options();
46    let Some(distributed_cfg) = opts.extensions.get::<DistributedConfig>() else {
47        return exec_err!("WorkerResolver not present in the session config");
48    };
49    Ok(Arc::clone(&distributed_cfg.__private_worker_resolver.0))
50}
51
52#[derive(Clone)]
53pub(crate) struct WorkerResolverExtension(
54    pub(crate) Arc<dyn WorkerResolver + Send + Sync + 'static>,
55);
56
57impl WorkerResolverExtension {
58    pub(crate) fn not_implemented() -> Self {
59        struct NotImplementedWorkerResolver;
60        impl WorkerResolver for NotImplementedWorkerResolver {
61            fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
62                not_impl_err!("WorkerResolver::get_urls() not implemented")
63            }
64        }
65        Self(Arc::new(NotImplementedWorkerResolver))
66    }
67}
68
69impl WorkerResolver for Arc<dyn WorkerResolver + Send + Sync> {
70    fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
71        self.as_ref().get_urls()
72    }
73}