datafusion_postgres/
lib.rs1mod datatypes;
2mod encoder;
3mod handlers;
4mod information_schema;
5
6use std::sync::Arc;
7
8use datafusion::prelude::SessionContext;
9use getset::{Getters, Setters, WithSetters};
10use pgwire::tokio::process_socket;
11use tokio::net::TcpListener;
12
13use handlers::HandlerFactory;
14pub use handlers::{DfSessionService, Parser};
15
16#[derive(Getters, Setters, WithSetters)]
17#[getset(get = "pub", set = "pub", set_with = "pub")]
18pub struct ServerOptions {
19 host: String,
20 port: u16,
21}
22
23impl ServerOptions {
24 pub fn new() -> ServerOptions {
25 ServerOptions::default()
26 }
27}
28
29impl Default for ServerOptions {
30 fn default() -> Self {
31 ServerOptions {
32 host: "127.0.0.1".to_string(),
33 port: 5432,
34 }
35 }
36}
37
38pub async fn serve(
40 session_context: Arc<SessionContext>,
41 opts: &ServerOptions,
42) -> Result<(), std::io::Error> {
43 let factory = Arc::new(HandlerFactory(Arc::new(DfSessionService::new(
45 session_context,
46 ))));
47
48 let server_addr = format!("{}:{}", opts.host, opts.port);
50 let listener = TcpListener::bind(&server_addr).await?;
51 println!("Listening on {}", server_addr);
52
53 loop {
55 match listener.accept().await {
56 Ok((socket, addr)) => {
57 let factory_ref = factory.clone();
58 println!("Accepted connection from {}", addr);
59
60 tokio::spawn(async move {
61 if let Err(e) = process_socket(socket, None, factory_ref).await {
62 eprintln!("Error processing socket: {}", e);
63 }
64 });
65 }
66 Err(e) => {
67 eprintln!("Error accept socket: {}", e);
68 }
69 }
70 }
71}