datafusion_postgres/
lib.rs

1mod datatypes;
2mod handlers;
3pub mod pg_catalog;
4
5use std::sync::Arc;
6
7use datafusion::prelude::SessionContext;
8use getset::{Getters, Setters, WithSetters};
9use pgwire::tokio::process_socket;
10use tokio::net::TcpListener;
11
12use handlers::HandlerFactory;
13pub use handlers::{DfSessionService, Parser};
14
15#[derive(Getters, Setters, WithSetters)]
16#[getset(get = "pub", set = "pub", set_with = "pub")]
17pub struct ServerOptions {
18    host: String,
19    port: u16,
20}
21
22impl ServerOptions {
23    pub fn new() -> ServerOptions {
24        ServerOptions::default()
25    }
26}
27
28impl Default for ServerOptions {
29    fn default() -> Self {
30        ServerOptions {
31            host: "127.0.0.1".to_string(),
32            port: 5432,
33        }
34    }
35}
36
37/// Serve the Datafusion `SessionContext` with Postgres protocol.
38pub async fn serve(
39    session_context: Arc<SessionContext>,
40    opts: &ServerOptions,
41) -> Result<(), std::io::Error> {
42    // Create the handler factory with the session context and catalog name
43    let factory = Arc::new(HandlerFactory(Arc::new(DfSessionService::new(
44        session_context,
45    ))));
46
47    // Bind to the specified host and port
48    let server_addr = format!("{}:{}", opts.host, opts.port);
49    let listener = TcpListener::bind(&server_addr).await?;
50    println!("Listening on {}", server_addr);
51
52    // Accept incoming connections
53    loop {
54        match listener.accept().await {
55            Ok((socket, addr)) => {
56                let factory_ref = factory.clone();
57                println!("Accepted connection from {}", addr);
58
59                tokio::spawn(async move {
60                    if let Err(e) = process_socket(socket, None, factory_ref).await {
61                        eprintln!("Error processing socket: {}", e);
62                    }
63                });
64            }
65            Err(e) => {
66                eprintln!("Error accept socket: {}", e);
67            }
68        }
69    }
70}