1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use datafusion::dataframe::DataFrame;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::{SessionConfig, SessionContext};
use std::sync::Arc;
pub enum Context {
Local(SessionContext),
Remote(BallistaContext),
}
impl Context {
pub async fn new_remote(host: &str, port: u16) -> Result<Context> {
Ok(Context::Remote(BallistaContext::try_new(host, port).await?))
}
pub fn new_local(config: &SessionConfig) -> Context {
Context::Local(SessionContext::with_config(config.clone()))
}
pub async fn sql(&mut self, sql: &str) -> Result<Arc<DataFrame>> {
match self {
Context::Local(datafusion) => datafusion.sql(sql).await,
Context::Remote(ballista) => ballista.sql(sql).await,
}
}
}
#[cfg(feature = "ballista")]
pub struct BallistaContext(ballista::context::BallistaContext);
#[cfg(feature = "ballista")]
impl BallistaContext {
pub async fn try_new(host: &str, port: u16) -> Result<Self> {
use ballista::context::BallistaContext;
use ballista::prelude::BallistaConfig;
let builder =
BallistaConfig::builder().set("ballista.with_information_schema", "true");
let config = builder
.build()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let remote_ctx = BallistaContext::remote(host, port, &config)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Ok(Self(remote_ctx))
}
pub async fn sql(&mut self, sql: &str) -> Result<Arc<DataFrame>> {
self.0.sql(sql).await
}
}
#[cfg(not(feature = "ballista"))]
pub struct BallistaContext();
#[cfg(not(feature = "ballista"))]
impl BallistaContext {
pub async fn try_new(_host: &str, _port: u16) -> Result<Self> {
Err(DataFusionError::NotImplemented(
"Remote execution not supported. Compile with feature 'ballista' to enable"
.to_string(),
))
}
pub async fn sql(&mut self, _sql: &str) -> Result<Arc<DataFrame>> {
unreachable!()
}
}