datafusion_postgres/
testing.rs1use std::{collections::HashMap, sync::Arc};
2
3use datafusion::prelude::{SessionConfig, SessionContext};
4use datafusion_pg_catalog::pg_catalog::setup_pg_catalog;
5use futures::Sink;
6use pgwire::{
7 api::{ClientInfo, ClientPortalStore, PgWireConnectionState, METADATA_USER},
8 messages::{
9 response::TransactionStatus, startup::SecretKey, PgWireBackendMessage, ProtocolVersion,
10 },
11};
12
13use crate::{auth::AuthManager, DfSessionService};
14
15pub fn setup_handlers() -> DfSessionService {
16 let session_config = SessionConfig::new().with_information_schema(true);
17 let session_context = SessionContext::new_with_config(session_config);
18
19 setup_pg_catalog(
20 &session_context,
21 "datafusion",
22 Arc::new(AuthManager::default()),
23 )
24 .expect("Failed to setup sesession context");
25
26 DfSessionService::new(Arc::new(session_context))
27}
28
29#[derive(Debug, Default)]
30pub struct MockClient {
31 metadata: HashMap<String, String>,
32 portal_store: HashMap<String, String>,
33}
34
35impl MockClient {
36 pub fn new() -> MockClient {
37 let mut metadata = HashMap::new();
38 metadata.insert(METADATA_USER.to_string(), "postgres".to_string());
39
40 MockClient {
41 metadata,
42 portal_store: HashMap::default(),
43 }
44 }
45}
46
47impl ClientInfo for MockClient {
48 fn socket_addr(&self) -> std::net::SocketAddr {
49 "127.0.0.1".parse().unwrap()
50 }
51
52 fn is_secure(&self) -> bool {
53 false
54 }
55
56 fn protocol_version(&self) -> ProtocolVersion {
57 ProtocolVersion::PROTOCOL3_0
58 }
59
60 fn set_protocol_version(&mut self, _version: ProtocolVersion) {}
61
62 fn pid_and_secret_key(&self) -> (i32, SecretKey) {
63 (0, SecretKey::I32(0))
64 }
65
66 fn set_pid_and_secret_key(&mut self, _pid: i32, _secret_key: SecretKey) {}
67
68 fn state(&self) -> PgWireConnectionState {
69 PgWireConnectionState::ReadyForQuery
70 }
71
72 fn set_state(&mut self, _new_state: PgWireConnectionState) {}
73
74 fn transaction_status(&self) -> TransactionStatus {
75 TransactionStatus::Idle
76 }
77
78 fn set_transaction_status(&mut self, _new_status: TransactionStatus) {}
79
80 fn metadata(&self) -> &HashMap<String, String> {
81 &self.metadata
82 }
83
84 fn metadata_mut(&mut self) -> &mut HashMap<String, String> {
85 &mut self.metadata
86 }
87
88 fn client_certificates<'a>(&self) -> Option<&[rustls_pki_types::CertificateDer<'a>]> {
89 None
90 }
91
92 fn sni_server_name(&self) -> Option<&str> {
93 None
94 }
95}
96
97impl ClientPortalStore for MockClient {
98 type PortalStore = HashMap<String, String>;
99 fn portal_store(&self) -> &Self::PortalStore {
100 &self.portal_store
101 }
102}
103
104impl Sink<PgWireBackendMessage> for MockClient {
105 type Error = std::io::Error;
106
107 fn poll_ready(
108 self: std::pin::Pin<&mut Self>,
109 _cx: &mut std::task::Context<'_>,
110 ) -> std::task::Poll<Result<(), Self::Error>> {
111 std::task::Poll::Ready(Ok(()))
112 }
113
114 fn start_send(
115 self: std::pin::Pin<&mut Self>,
116 _item: PgWireBackendMessage,
117 ) -> Result<(), Self::Error> {
118 Ok(())
119 }
120
121 fn poll_flush(
122 self: std::pin::Pin<&mut Self>,
123 _cx: &mut std::task::Context<'_>,
124 ) -> std::task::Poll<Result<(), Self::Error>> {
125 std::task::Poll::Ready(Ok(()))
126 }
127
128 fn poll_close(
129 self: std::pin::Pin<&mut Self>,
130 _cx: &mut std::task::Context<'_>,
131 ) -> std::task::Poll<Result<(), Self::Error>> {
132 std::task::Poll::Ready(Ok(()))
133 }
134}