datafusion_postgres/
lib.rs1mod datatypes;
2mod handlers;
3pub mod pg_catalog;
4
5use std::sync::Arc;
6
7use datafusion::prelude::SessionContext;
8use getset::{Getters, Setters, WithSetters};
9use pgwire::tokio::process_socket;
10use tokio::net::TcpListener;
11
12use handlers::HandlerFactory;
13pub use handlers::{DfSessionService, Parser};
14
15#[derive(Getters, Setters, WithSetters)]
16#[getset(get = "pub", set = "pub", set_with = "pub")]
17pub struct ServerOptions {
18 host: String,
19 port: u16,
20}
21
22impl ServerOptions {
23 pub fn new() -> ServerOptions {
24 ServerOptions::default()
25 }
26}
27
28impl Default for ServerOptions {
29 fn default() -> Self {
30 ServerOptions {
31 host: "127.0.0.1".to_string(),
32 port: 5432,
33 }
34 }
35}
36
37pub async fn serve(
39 session_context: Arc<SessionContext>,
40 opts: &ServerOptions,
41) -> Result<(), std::io::Error> {
42 let factory = Arc::new(HandlerFactory(Arc::new(DfSessionService::new(
44 session_context,
45 ))));
46
47 let server_addr = format!("{}:{}", opts.host, opts.port);
49 let listener = TcpListener::bind(&server_addr).await?;
50 println!("Listening on {}", server_addr);
51
52 loop {
54 match listener.accept().await {
55 Ok((socket, addr)) => {
56 let factory_ref = factory.clone();
57 println!("Accepted connection from {}", addr);
58
59 tokio::spawn(async move {
60 if let Err(e) = process_socket(socket, None, factory_ref).await {
61 eprintln!("Error processing socket: {}", e);
62 }
63 });
64 }
65 Err(e) => {
66 eprintln!("Error accept socket: {}", e);
67 }
68 }
69 }
70}