alopex_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3use std::time::Instant;
4
5use alopex_core::kv::any::AnyKV;
6use alopex_core::kv::async_adapter::{AsyncKVStoreAdapter, AsyncKVTransactionAdapter};
7use alopex_core::kv::storage::{StorageFactory, StorageMode};
8use alopex_core::kv::AsyncKVStore;
9use alopex_core::types::TxnMode;
10use alopex_sql::catalog::{Catalog, CatalogError, PersistentCatalog};
11use alopex_sql::storage::async_storage::AsyncTxnBridge;
12use alopex_sql::storage::erased::ErasedAsyncSqlTransaction;
13use tokio::sync::broadcast;
14
15use crate::audit::AuditLogger;
16use crate::auth::AuthMiddleware;
17use crate::config::ServerConfig;
18use crate::error::{Result, ServerError};
19use crate::metrics::Metrics;
20use crate::session::{SessionConfig, SessionManager, TransactionFactory};
21use crate::tls;
22
23pub struct Server {
24    pub state: Arc<ServerState>,
25}
26
27pub struct ServerState {
28    pub config: ServerConfig,
29    pub store: Arc<AnyKV>,
30    pub catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
31    pub async_store: Arc<AsyncKVStoreAdapter<AnyKV>>,
32    pub session_manager: Arc<SessionManager>,
33    pub metrics: Metrics,
34    pub audit: AuditLogger,
35    pub auth: AuthMiddleware,
36    pub start_time: Instant,
37}
38
39impl Server {
40    pub fn new(config: ServerConfig) -> Result<Self> {
41        config.validate()?;
42        let store = StorageFactory::create(StorageMode::Disk {
43            path: config.data_dir.clone(),
44            config: None,
45        })?;
46        let store = Arc::new(store);
47        let catalog = load_catalog(store.clone())?;
48        let async_store = Arc::new(AsyncKVStoreAdapter::from_arc(
49            store.clone(),
50            TxnMode::ReadWrite,
51        ));
52        let metrics = Metrics::new()?;
53        let audit = AuditLogger::new(config.audit_log_output.clone())?;
54        let auth = AuthMiddleware::new(config.auth_mode.clone());
55
56        let txn_factory = build_txn_factory(async_store.clone(), catalog.clone());
57        let session_manager = Arc::new(SessionManager::new(
58            SessionConfig {
59                ttl: config.session_ttl,
60            },
61            txn_factory,
62        ));
63
64        Ok(Self {
65            state: Arc::new(ServerState {
66                config,
67                store,
68                catalog,
69                async_store,
70                session_manager,
71                metrics,
72                audit,
73                auth,
74                start_time: Instant::now(),
75            }),
76        })
77    }
78
79    pub async fn run(self) -> Result<()> {
80        if self.state.config.tracing_enabled {
81            init_tracing();
82        }
83
84        let (shutdown_tx, _) = broadcast::channel(2);
85        let http_state = self.state.clone();
86        let admin_state = self.state.clone();
87        let grpc_state = self.state.clone();
88        let cleanup_state = self.state.clone();
89        let http_shutdown = shutdown_tx.subscribe();
90        let admin_shutdown = shutdown_tx.subscribe();
91        let grpc_shutdown = shutdown_tx.subscribe();
92        let cleanup_shutdown = shutdown_tx.subscribe();
93
94        let tls_config = if let Some(tls) = &self.state.config.tls {
95            let config = tls::build_rustls_config(tls)?;
96            Some(axum_server::tls_rustls::RustlsConfig::from_config(config))
97        } else {
98            None
99        };
100
101        let http_task = tokio::spawn(run_http(http_state, tls_config.clone(), http_shutdown));
102        let admin_task = tokio::spawn(run_admin(admin_state, tls_config, admin_shutdown));
103        let grpc_task = tokio::spawn(run_grpc(grpc_state, grpc_shutdown));
104        let cleanup_task = tokio::spawn(run_cleanup(cleanup_state, cleanup_shutdown));
105
106        wait_for_shutdown(shutdown_tx.clone()).await;
107        let _ = shutdown_tx.send(());
108
109        http_task
110            .await
111            .map_err(|err| ServerError::Internal(err.to_string()))??;
112        admin_task
113            .await
114            .map_err(|err| ServerError::Internal(err.to_string()))??;
115        grpc_task
116            .await
117            .map_err(|err| ServerError::Internal(err.to_string()))??;
118        cleanup_task
119            .await
120            .map_err(|err| ServerError::Internal(err.to_string()))??;
121
122        self.state.audit.flush()?;
123        Ok(())
124    }
125}
126
127impl ServerState {
128    pub async fn begin_sql_txn(
129        &self,
130    ) -> Result<AsyncTxnBridge<'static, AsyncKVTransactionAdapter>> {
131        let txn = self.async_store.begin_async().await?;
132        Ok(AsyncTxnBridge::with_catalog(
133            txn,
134            TxnMode::ReadWrite,
135            self.catalog.clone(),
136        ))
137    }
138}
139
140fn build_txn_factory(
141    store: Arc<AsyncKVStoreAdapter<AnyKV>>,
142    catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
143) -> TransactionFactory {
144    Arc::new(move || {
145        let store = store.clone();
146        let catalog = catalog.clone();
147        Box::pin(async move {
148            let txn = store.begin_async().await?;
149            let bridge: AsyncTxnBridge<'static, AsyncKVTransactionAdapter> =
150                AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, catalog);
151            Ok(Box::new(bridge) as Box<dyn ErasedAsyncSqlTransaction>)
152        })
153    })
154}
155
156fn load_catalog(store: Arc<AnyKV>) -> Result<Arc<RwLock<dyn Catalog + Send + Sync>>> {
157    let catalog = match PersistentCatalog::load(store.clone()) {
158        Ok(catalog) => catalog,
159        Err(CatalogError::Kv(alopex_core::Error::NotFound)) => PersistentCatalog::new(store),
160        Err(err) => return Err(ServerError::Catalog(err)),
161    };
162    let catalog: Arc<RwLock<dyn Catalog + Send + Sync>> = Arc::new(RwLock::new(catalog));
163    Ok(catalog)
164}
165
166async fn run_http(
167    state: Arc<ServerState>,
168    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
169    mut shutdown: broadcast::Receiver<()>,
170) -> Result<()> {
171    let app = crate::http::router(state.clone());
172    let addr = state.config.http_bind;
173
174    if let Some(tls) = tls_config {
175        let handle = axum_server::Handle::new();
176        let shutdown_handle = handle.clone();
177        tokio::spawn(async move {
178            let _ = shutdown.recv().await;
179            shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
180        });
181        axum_server::bind_rustls(addr, tls)
182            .handle(handle)
183            .serve(app.into_make_service())
184            .await
185            .map_err(|err| ServerError::Internal(err.to_string()))?;
186    } else {
187        let shutdown_signal = async move {
188            let _ = shutdown.recv().await;
189        };
190        axum::Server::bind(&addr)
191            .serve(app.into_make_service())
192            .with_graceful_shutdown(shutdown_signal)
193            .await
194            .map_err(|err| ServerError::Internal(err.to_string()))?;
195    }
196    Ok(())
197}
198
199async fn run_admin(
200    state: Arc<ServerState>,
201    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
202    mut shutdown: broadcast::Receiver<()>,
203) -> Result<()> {
204    let app = crate::http::admin_router(state.clone());
205    let addr = state.config.admin_bind;
206
207    if let Some(tls) = tls_config {
208        let handle = axum_server::Handle::new();
209        let shutdown_handle = handle.clone();
210        tokio::spawn(async move {
211            let _ = shutdown.recv().await;
212            shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
213        });
214        axum_server::bind_rustls(addr, tls)
215            .handle(handle)
216            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
217            .await
218            .map_err(|err| ServerError::Internal(err.to_string()))?;
219    } else {
220        let shutdown_signal = async move {
221            let _ = shutdown.recv().await;
222        };
223        axum::Server::bind(&addr)
224            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
225            .with_graceful_shutdown(shutdown_signal)
226            .await
227            .map_err(|err| ServerError::Internal(err.to_string()))?;
228    }
229    Ok(())
230}
231
232async fn run_grpc(state: Arc<ServerState>, shutdown: broadcast::Receiver<()>) -> Result<()> {
233    let addr = state.config.grpc_bind;
234    crate::grpc::serve(state, addr, shutdown).await
235}
236
237async fn run_cleanup(state: Arc<ServerState>, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
238    let mut interval = tokio::time::interval(state.config.session_ttl);
239    loop {
240        tokio::select! {
241            _ = interval.tick() => {
242                state.session_manager.cleanup_expired();
243            }
244            _ = shutdown.recv() => break,
245        }
246    }
247    Ok(())
248}
249
250async fn wait_for_shutdown(signal: broadcast::Sender<()>) {
251    #[cfg(unix)]
252    let mut term = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
253        Ok(signal) => signal,
254        Err(_) => {
255            let _ = tokio::signal::ctrl_c().await;
256            let _ = signal.send(());
257            return;
258        }
259    };
260
261    #[cfg(unix)]
262    tokio::select! {
263        _ = tokio::signal::ctrl_c() => {}
264        _ = term.recv() => {}
265    }
266
267    #[cfg(not(unix))]
268    {
269        let _ = tokio::signal::ctrl_c().await;
270    }
271
272    let _ = signal.send(());
273}
274
275fn init_tracing() {
276    let _ = tracing_subscriber::fmt()
277        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
278        .try_init();
279}