alopex_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3
4use alopex_core::kv::any::AnyKV;
5use alopex_core::kv::async_adapter::{AsyncKVStoreAdapter, AsyncKVTransactionAdapter};
6use alopex_core::kv::storage::{StorageFactory, StorageMode};
7use alopex_core::kv::AsyncKVStore;
8use alopex_core::types::TxnMode;
9use alopex_sql::catalog::{Catalog, CatalogError, PersistentCatalog};
10use alopex_sql::storage::async_storage::AsyncTxnBridge;
11use alopex_sql::storage::erased::ErasedAsyncSqlTransaction;
12use tokio::sync::broadcast;
13
14use crate::audit::AuditLogger;
15use crate::auth::AuthMiddleware;
16use crate::config::ServerConfig;
17use crate::error::{Result, ServerError};
18use crate::metrics::Metrics;
19use crate::session::{SessionConfig, SessionManager, TransactionFactory};
20use crate::tls;
21
22pub struct Server {
23    pub state: Arc<ServerState>,
24}
25
26pub struct ServerState {
27    pub config: ServerConfig,
28    pub store: Arc<AnyKV>,
29    pub catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
30    pub async_store: Arc<AsyncKVStoreAdapter<AnyKV>>,
31    pub session_manager: Arc<SessionManager>,
32    pub metrics: Metrics,
33    pub audit: AuditLogger,
34    pub auth: AuthMiddleware,
35}
36
37impl Server {
38    pub fn new(config: ServerConfig) -> Result<Self> {
39        config.validate()?;
40        let store = StorageFactory::create(StorageMode::Disk {
41            path: config.data_dir.clone(),
42            config: None,
43        })?;
44        let store = Arc::new(store);
45        let catalog = load_catalog(store.clone())?;
46        let async_store = Arc::new(AsyncKVStoreAdapter::from_arc(
47            store.clone(),
48            TxnMode::ReadWrite,
49        ));
50        let metrics = Metrics::new()?;
51        let audit = AuditLogger::new(config.audit_log_output.clone())?;
52        let auth = AuthMiddleware::new(config.auth_mode.clone());
53
54        let txn_factory = build_txn_factory(async_store.clone(), catalog.clone());
55        let session_manager = Arc::new(SessionManager::new(
56            SessionConfig {
57                ttl: config.session_ttl,
58            },
59            txn_factory,
60        ));
61
62        Ok(Self {
63            state: Arc::new(ServerState {
64                config,
65                store,
66                catalog,
67                async_store,
68                session_manager,
69                metrics,
70                audit,
71                auth,
72            }),
73        })
74    }
75
76    pub async fn run(self) -> Result<()> {
77        if self.state.config.tracing_enabled {
78            init_tracing();
79        }
80
81        let (shutdown_tx, _) = broadcast::channel(2);
82        let http_state = self.state.clone();
83        let admin_state = self.state.clone();
84        let grpc_state = self.state.clone();
85        let cleanup_state = self.state.clone();
86        let http_shutdown = shutdown_tx.subscribe();
87        let admin_shutdown = shutdown_tx.subscribe();
88        let grpc_shutdown = shutdown_tx.subscribe();
89        let cleanup_shutdown = shutdown_tx.subscribe();
90
91        let tls_config = if let Some(tls) = &self.state.config.tls {
92            let config = tls::build_rustls_config(tls)?;
93            Some(axum_server::tls_rustls::RustlsConfig::from_config(config))
94        } else {
95            None
96        };
97
98        let http_task = tokio::spawn(run_http(http_state, tls_config.clone(), http_shutdown));
99        let admin_task = tokio::spawn(run_admin(admin_state, tls_config, admin_shutdown));
100        let grpc_task = tokio::spawn(run_grpc(grpc_state, grpc_shutdown));
101        let cleanup_task = tokio::spawn(run_cleanup(cleanup_state, cleanup_shutdown));
102
103        wait_for_shutdown(shutdown_tx.clone()).await;
104        let _ = shutdown_tx.send(());
105
106        http_task
107            .await
108            .map_err(|err| ServerError::Internal(err.to_string()))??;
109        admin_task
110            .await
111            .map_err(|err| ServerError::Internal(err.to_string()))??;
112        grpc_task
113            .await
114            .map_err(|err| ServerError::Internal(err.to_string()))??;
115        cleanup_task
116            .await
117            .map_err(|err| ServerError::Internal(err.to_string()))??;
118
119        self.state.audit.flush()?;
120        Ok(())
121    }
122}
123
124impl ServerState {
125    pub async fn begin_sql_txn(
126        &self,
127    ) -> Result<AsyncTxnBridge<'static, AsyncKVTransactionAdapter>> {
128        let txn = self.async_store.begin_async().await?;
129        Ok(AsyncTxnBridge::with_catalog(
130            txn,
131            TxnMode::ReadWrite,
132            self.catalog.clone(),
133        ))
134    }
135}
136
137fn build_txn_factory(
138    store: Arc<AsyncKVStoreAdapter<AnyKV>>,
139    catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
140) -> TransactionFactory {
141    Arc::new(move || {
142        let store = store.clone();
143        let catalog = catalog.clone();
144        Box::pin(async move {
145            let txn = store.begin_async().await?;
146            let bridge: AsyncTxnBridge<'static, AsyncKVTransactionAdapter> =
147                AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, catalog);
148            Ok(Box::new(bridge) as Box<dyn ErasedAsyncSqlTransaction>)
149        })
150    })
151}
152
153fn load_catalog(store: Arc<AnyKV>) -> Result<Arc<RwLock<dyn Catalog + Send + Sync>>> {
154    let catalog = match PersistentCatalog::load(store.clone()) {
155        Ok(catalog) => catalog,
156        Err(CatalogError::Kv(alopex_core::Error::NotFound)) => PersistentCatalog::new(store),
157        Err(err) => return Err(ServerError::Catalog(err)),
158    };
159    let catalog: Arc<RwLock<dyn Catalog + Send + Sync>> = Arc::new(RwLock::new(catalog));
160    Ok(catalog)
161}
162
163async fn run_http(
164    state: Arc<ServerState>,
165    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
166    mut shutdown: broadcast::Receiver<()>,
167) -> Result<()> {
168    let app = crate::http::router(state.clone());
169    let addr = state.config.http_bind;
170
171    if let Some(tls) = tls_config {
172        let handle = axum_server::Handle::new();
173        let shutdown_handle = handle.clone();
174        tokio::spawn(async move {
175            let _ = shutdown.recv().await;
176            shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
177        });
178        axum_server::bind_rustls(addr, tls)
179            .handle(handle)
180            .serve(app.into_make_service())
181            .await
182            .map_err(|err| ServerError::Internal(err.to_string()))?;
183    } else {
184        let shutdown_signal = async move {
185            let _ = shutdown.recv().await;
186        };
187        axum::Server::bind(&addr)
188            .serve(app.into_make_service())
189            .with_graceful_shutdown(shutdown_signal)
190            .await
191            .map_err(|err| ServerError::Internal(err.to_string()))?;
192    }
193    Ok(())
194}
195
196async fn run_admin(
197    state: Arc<ServerState>,
198    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
199    mut shutdown: broadcast::Receiver<()>,
200) -> Result<()> {
201    let app = crate::http::admin_router(state.clone());
202    let addr = state.config.admin_bind;
203
204    if let Some(tls) = tls_config {
205        let handle = axum_server::Handle::new();
206        let shutdown_handle = handle.clone();
207        tokio::spawn(async move {
208            let _ = shutdown.recv().await;
209            shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
210        });
211        axum_server::bind_rustls(addr, tls)
212            .handle(handle)
213            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
214            .await
215            .map_err(|err| ServerError::Internal(err.to_string()))?;
216    } else {
217        let shutdown_signal = async move {
218            let _ = shutdown.recv().await;
219        };
220        axum::Server::bind(&addr)
221            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
222            .with_graceful_shutdown(shutdown_signal)
223            .await
224            .map_err(|err| ServerError::Internal(err.to_string()))?;
225    }
226    Ok(())
227}
228
229async fn run_grpc(state: Arc<ServerState>, shutdown: broadcast::Receiver<()>) -> Result<()> {
230    let addr = state.config.grpc_bind;
231    crate::grpc::serve(state, addr, shutdown).await
232}
233
234async fn run_cleanup(state: Arc<ServerState>, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
235    let mut interval = tokio::time::interval(state.config.session_ttl);
236    loop {
237        tokio::select! {
238            _ = interval.tick() => {
239                state.session_manager.cleanup_expired();
240            }
241            _ = shutdown.recv() => break,
242        }
243    }
244    Ok(())
245}
246
247async fn wait_for_shutdown(signal: broadcast::Sender<()>) {
248    #[cfg(unix)]
249    let mut term = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
250        Ok(signal) => signal,
251        Err(_) => {
252            let _ = tokio::signal::ctrl_c().await;
253            let _ = signal.send(());
254            return;
255        }
256    };
257
258    #[cfg(unix)]
259    tokio::select! {
260        _ = tokio::signal::ctrl_c() => {}
261        _ = term.recv() => {}
262    }
263
264    #[cfg(not(unix))]
265    {
266        let _ = tokio::signal::ctrl_c().await;
267    }
268
269    let _ = signal.send(());
270}
271
272fn init_tracing() {
273    let _ = tracing_subscriber::fmt()
274        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
275        .try_init();
276}