Skip to main content

datafusion_postgres/hooks/
mod.rs

1pub mod cursor;
2pub mod permissions;
3pub mod set_show;
4pub mod transactions;
5
6use async_trait::async_trait;
7
8use datafusion::common::ParamValues;
9use datafusion::logical_expr::LogicalPlan;
10use datafusion::prelude::SessionContext;
11use datafusion::sql::sqlparser::ast::Statement;
12use futures::Sink;
13use pgwire::api::ClientInfo;
14use pgwire::api::ClientPortalStore;
15use pgwire::api::results::Response;
16use pgwire::api::store::{MemPortalStore, PortalStore};
17use pgwire::error::{PgWireError, PgWireResult};
18use pgwire::messages::PgWireBackendMessage;
19
20use crate::hooks::cursor::DfStatement;
21
22#[async_trait]
23pub trait HookClient: ClientInfo + Send + Sync {
24    fn portal_store(&self) -> &MemPortalStore<DfStatement>;
25
26    async fn send_message(&mut self, item: PgWireBackendMessage) -> PgWireResult<()>;
27}
28
29#[async_trait]
30impl<S> HookClient for S
31where
32    S: ClientInfo + ClientPortalStore + Sink<PgWireBackendMessage> + Send + Sync + Unpin,
33    PgWireError: From<<S as Sink<PgWireBackendMessage>>::Error>,
34    S::PortalStore: PortalStore,
35{
36    fn portal_store(&self) -> &MemPortalStore<DfStatement> {
37        self.portal_store()
38            .as_any()
39            .downcast_ref::<MemPortalStore<DfStatement>>()
40            .expect("portal store is not MemPortalStore<DfStatement>")
41    }
42
43    async fn send_message(&mut self, item: PgWireBackendMessage) -> PgWireResult<()> {
44        use futures::SinkExt;
45        self.send(item).await.map_err(PgWireError::from)
46    }
47}
48
49#[async_trait]
50pub trait QueryHook: Send + Sync {
51    /// called in simple query handler to return response directly
52    async fn handle_simple_query(
53        &self,
54        statement: &Statement,
55        session_context: &SessionContext,
56        client: &mut dyn HookClient,
57    ) -> Option<PgWireResult<Response>>;
58
59    /// called at extended query parse phase, for generating `LogicalPlan`from statement
60    async fn handle_extended_parse_query(
61        &self,
62        sql: &Statement,
63        session_context: &SessionContext,
64        client: &(dyn ClientInfo + Send + Sync),
65    ) -> Option<PgWireResult<LogicalPlan>>;
66
67    /// called at extended query execute phase, for query execution
68    async fn handle_extended_query(
69        &self,
70        statement: &Statement,
71        logical_plan: &LogicalPlan,
72        params: &ParamValues,
73        session_context: &SessionContext,
74        client: &mut dyn HookClient,
75    ) -> Option<PgWireResult<Response>>;
76}