datafusion_cli/
cli_context.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::{
21    dataframe::DataFrame,
22    error::DataFusionError,
23    execution::{context::SessionState, TaskContext},
24    logical_expr::LogicalPlan,
25    prelude::SessionContext,
26};
27use object_store::ObjectStore;
28
29use crate::object_storage::{AwsOptions, GcpOptions};
30
31#[async_trait::async_trait]
32/// The CLI session context trait provides a way to have a session context that can be used with datafusion's CLI code.
33pub trait CliSessionContext {
34    /// Get an atomic reference counted task context.
35    fn task_ctx(&self) -> Arc<TaskContext>;
36
37    /// Get the session state.
38    fn session_state(&self) -> SessionState;
39
40    /// Register an object store with the session context.
41    fn register_object_store(
42        &self,
43        url: &url::Url,
44        object_store: Arc<dyn ObjectStore>,
45    ) -> Option<Arc<dyn ObjectStore + 'static>>;
46
47    /// Register table options extension from scheme.
48    fn register_table_options_extension_from_scheme(&self, scheme: &str);
49
50    /// Execute a logical plan and return a DataFrame.
51    async fn execute_logical_plan(
52        &self,
53        plan: LogicalPlan,
54    ) -> Result<DataFrame, DataFusionError>;
55}
56
57#[async_trait::async_trait]
58impl CliSessionContext for SessionContext {
59    fn task_ctx(&self) -> Arc<TaskContext> {
60        self.task_ctx()
61    }
62
63    fn session_state(&self) -> SessionState {
64        self.state()
65    }
66
67    fn register_object_store(
68        &self,
69        url: &url::Url,
70        object_store: Arc<dyn ObjectStore>,
71    ) -> Option<Arc<dyn ObjectStore + 'static>> {
72        self.register_object_store(url, object_store)
73    }
74
75    fn register_table_options_extension_from_scheme(&self, scheme: &str) {
76        match scheme {
77            // For Amazon S3 or Alibaba Cloud OSS
78            "s3" | "oss" | "cos" => {
79                // Register AWS specific table options in the session context:
80                self.register_table_options_extension(AwsOptions::default())
81            }
82            // For Google Cloud Storage
83            "gs" | "gcs" => {
84                // Register GCP specific table options in the session context:
85                self.register_table_options_extension(GcpOptions::default())
86            }
87            // For unsupported schemes, do nothing:
88            _ => {}
89        }
90    }
91
92    async fn execute_logical_plan(
93        &self,
94        plan: LogicalPlan,
95    ) -> Result<DataFrame, DataFusionError> {
96        self.execute_logical_plan(plan).await
97    }
98}