datafusion_cli/cli_context.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21 dataframe::DataFrame,
22 error::DataFusionError,
23 execution::{context::SessionState, TaskContext},
24 logical_expr::LogicalPlan,
25 prelude::SessionContext,
26};
27use object_store::ObjectStore;
28
29use crate::object_storage::{AwsOptions, GcpOptions};
30
31#[async_trait::async_trait]
32/// The CLI session context trait provides a way to have a session context that can be used with datafusion's CLI code.
33pub trait CliSessionContext {
34 /// Get an atomic reference counted task context.
35 fn task_ctx(&self) -> Arc<TaskContext>;
36
37 /// Get the session state.
38 fn session_state(&self) -> SessionState;
39
40 /// Register an object store with the session context.
41 fn register_object_store(
42 &self,
43 url: &url::Url,
44 object_store: Arc<dyn ObjectStore>,
45 ) -> Option<Arc<dyn ObjectStore + 'static>>;
46
47 /// Register table options extension from scheme.
48 fn register_table_options_extension_from_scheme(&self, scheme: &str);
49
50 /// Execute a logical plan and return a DataFrame.
51 async fn execute_logical_plan(
52 &self,
53 plan: LogicalPlan,
54 ) -> Result<DataFrame, DataFusionError>;
55}
56
57#[async_trait::async_trait]
58impl CliSessionContext for SessionContext {
59 fn task_ctx(&self) -> Arc<TaskContext> {
60 self.task_ctx()
61 }
62
63 fn session_state(&self) -> SessionState {
64 self.state()
65 }
66
67 fn register_object_store(
68 &self,
69 url: &url::Url,
70 object_store: Arc<dyn ObjectStore>,
71 ) -> Option<Arc<dyn ObjectStore + 'static>> {
72 self.register_object_store(url, object_store)
73 }
74
75 fn register_table_options_extension_from_scheme(&self, scheme: &str) {
76 match scheme {
77 // For Amazon S3 or Alibaba Cloud OSS
78 "s3" | "oss" | "cos" => {
79 // Register AWS specific table options in the session context:
80 self.register_table_options_extension(AwsOptions::default())
81 }
82 // For Google Cloud Storage
83 "gs" | "gcs" => {
84 // Register GCP specific table options in the session context:
85 self.register_table_options_extension(GcpOptions::default())
86 }
87 // For unsupported schemes, do nothing:
88 _ => {}
89 }
90 }
91
92 async fn execute_logical_plan(
93 &self,
94 plan: LogicalPlan,
95 ) -> Result<DataFrame, DataFusionError> {
96 self.execute_logical_plan(plan).await
97 }
98}