Skip to main content

datapress_datafusion/
lib.rs

1//! `datapress-datafusion` — DataFusion backend for the DataPress HTTP server.
2
3pub mod store;
4
5#[cfg(feature = "pgwire")]
6pub mod pgwire;
7
8use std::sync::Arc;
9
10use crate::store::Store;
11use datapress_core::backend::Backend;
12use datapress_core::config::AppConfig;
13
14/// Build the dataset store, start the actix server, and run until the
15/// process receives SIGINT.
16pub async fn serve(cfg: AppConfig) -> std::io::Result<()> {
17    datapress_core::banner::print();
18    let store = Arc::new(Store::load(&cfg).await.expect("failed to load datasets"));
19
20    #[cfg(feature = "pgwire")]
21    let _pgwire = if cfg.server.pgwire.enabled {
22        let ctx = store.session_context().clone();
23        Some(pgwire::spawn_pgwire(ctx, cfg.server.pgwire.clone())?)
24    } else {
25        None
26    };
27    #[cfg(not(feature = "pgwire"))]
28    if cfg.server.pgwire.enabled {
29        log::warn!(
30            "server.pgwire.enabled = true but this binary was built without the \
31             `pgwire` feature; the PostgreSQL wire protocol server will not start"
32        );
33    }
34
35    let store: Arc<dyn Backend> = store;
36    datapress_core::server::serve(cfg, store, "DataFusion").await
37}
38
39/// Like [`serve`], but driven to a graceful stop by `shutdown` instead of
40/// OS signals. Used when DataPress is embedded in another runtime (the
41/// Python extension) so it doesn't install signal handlers that fight the
42/// host's.
43pub async fn serve_with_shutdown(
44    cfg: AppConfig,
45    shutdown: impl std::future::Future<Output = ()> + Send + 'static,
46) -> std::io::Result<()> {
47    datapress_core::banner::print();
48    let store = Arc::new(Store::load(&cfg).await.expect("failed to load datasets"));
49
50    #[cfg(feature = "pgwire")]
51    let pgwire_server = if cfg.server.pgwire.enabled {
52        let ctx = store.session_context().clone();
53        Some(pgwire::spawn_pgwire(ctx, cfg.server.pgwire.clone())?)
54    } else {
55        None
56    };
57    #[cfg(not(feature = "pgwire"))]
58    if cfg.server.pgwire.enabled {
59        log::warn!(
60            "server.pgwire.enabled = true but this binary was built without the \
61             `pgwire` feature; the PostgreSQL wire protocol server will not start"
62        );
63    }
64
65    let store: Arc<dyn Backend> = store;
66    let result =
67        datapress_core::server::serve_with_shutdown(cfg, store, "DataFusion", shutdown).await;
68
69    // The core server has returned (the `shutdown` future fired), so tear the
70    // pgwire listener down with it rather than leaving it orphaned. Dropping the
71    // handle signals its runtime to stop and joins the thread.
72    #[cfg(feature = "pgwire")]
73    drop(pgwire_server);
74
75    result
76}