datafusion_postgres/
lib.rs

1mod datatypes;
2mod encoder;
3mod handlers;
4mod information_schema;
5
6use std::sync::Arc;
7
8use datafusion::prelude::SessionContext;
9use getset::{Getters, Setters, WithSetters};
10use pgwire::tokio::process_socket;
11use tokio::net::TcpListener;
12
13use handlers::HandlerFactory;
14pub use handlers::{DfSessionService, Parser};
15
16#[derive(Getters, Setters, WithSetters)]
17#[getset(get = "pub", set = "pub", set_with = "pub")]
18pub struct ServerOptions {
19    host: String,
20    port: u16,
21}
22
23impl ServerOptions {
24    pub fn new() -> ServerOptions {
25        ServerOptions::default()
26    }
27}
28
29impl Default for ServerOptions {
30    fn default() -> Self {
31        ServerOptions {
32            host: "127.0.0.1".to_string(),
33            port: 5432,
34        }
35    }
36}
37
38/// Serve the Datafusion `SessionContext` with Postgres protocol.
39pub async fn serve(
40    session_context: Arc<SessionContext>,
41    opts: &ServerOptions,
42) -> Result<(), std::io::Error> {
43    // Create the handler factory with the session context and catalog name
44    let factory = Arc::new(HandlerFactory(Arc::new(DfSessionService::new(
45        session_context,
46    ))));
47
48    // Bind to the specified host and port
49    let server_addr = format!("{}:{}", opts.host, opts.port);
50    let listener = TcpListener::bind(&server_addr).await?;
51    println!("Listening on {}", server_addr);
52
53    // Accept incoming connections
54    loop {
55        match listener.accept().await {
56            Ok((socket, addr)) => {
57                let factory_ref = factory.clone();
58                println!("Accepted connection from {}", addr);
59
60                tokio::spawn(async move {
61                    if let Err(e) = process_socket(socket, None, factory_ref).await {
62                        eprintln!("Error processing socket: {}", e);
63                    }
64                });
65            }
66            Err(e) => {
67                eprintln!("Error accept socket: {}", e);
68            }
69        }
70    }
71}