Skip to main content

alopex_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3use std::time::Instant;
4
5use alopex_core::kv::any::AnyKV;
6use alopex_core::kv::async_adapter::{AsyncKVStoreAdapter, AsyncKVTransactionAdapter};
7use alopex_core::kv::AsyncKVStore;
8use alopex_core::types::TxnMode;
9use alopex_sql::catalog::{Catalog, CatalogError, PersistentCatalog};
10use alopex_sql::storage::async_storage::AsyncTxnBridge;
11use alopex_sql::storage::erased::ErasedAsyncSqlTransaction;
12use tokio::sync::broadcast;
13
14use crate::audit::AuditLogger;
15use crate::auth::AuthMiddleware;
16use crate::config::ServerConfig;
17use crate::error::{Result, ServerError};
18use crate::metrics::Metrics;
19use crate::ops::backup::BackupCoordinator;
20use crate::ops::memory::MemoryControlPolicy;
21use crate::ops::recovery::{RecoveryCoordinator, RecoveryInfo};
22use crate::ops::restore::RestoreCoordinator;
23use crate::ops::state::{LifecycleStateManager, Mode};
24use crate::session::{SessionConfig, SessionManager, TransactionFactory};
25use crate::tls;
26
27pub struct Server {
28    pub state: Arc<ServerState>,
29}
30
31pub struct ServerState {
32    pub config: ServerConfig,
33    pub store: Arc<AnyKV>,
34    pub catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
35    pub async_store: Arc<AsyncKVStoreAdapter<AnyKV>>,
36    pub session_manager: Arc<SessionManager>,
37    pub metrics: Metrics,
38    pub audit: AuditLogger,
39    pub auth: AuthMiddleware,
40    pub start_time: Instant,
41    pub lifecycle_state: Arc<LifecycleStateManager>,
42    pub recovery_info: RecoveryInfo,
43    pub backup_coordinator: BackupCoordinator,
44    pub restore_coordinator: RestoreCoordinator,
45}
46
47impl Server {
48    pub fn new(config: ServerConfig) -> Result<Self> {
49        config.validate()?;
50        let (store, recovery_info) = RecoveryCoordinator::open_store(&config.data_dir)?;
51        let lifecycle_state = Arc::new(LifecycleStateManager::new(Mode::Normal));
52        RecoveryCoordinator::apply_initial_mode(&lifecycle_state, &recovery_info);
53
54        let store = Arc::new(store);
55        let catalog = load_catalog(store.clone())?;
56        let async_store = Arc::new(AsyncKVStoreAdapter::from_arc(
57            store.clone(),
58            TxnMode::ReadWrite,
59        ));
60        let metrics = Metrics::new()?;
61        let audit = AuditLogger::new(config.audit_log_output.clone())?;
62        let auth = AuthMiddleware::new(config.auth_mode.clone());
63
64        let txn_factory = build_txn_factory(async_store.clone(), catalog.clone(), metrics.clone());
65        let session_manager = Arc::new(SessionManager::new(
66            SessionConfig {
67                ttl: config.session_ttl,
68            },
69            txn_factory,
70        ));
71        let data_dir = config.data_dir.clone();
72        let checkpoint = {
73            let store = store.clone();
74            Arc::new(move || match store.as_ref() {
75                AnyKV::Lsm(kv) => {
76                    kv.checkpoint()?;
77                    Ok(())
78                }
79                _ => Err(ServerError::BadRequest(
80                    "checkpoint unsupported for current storage engine".to_string(),
81                )),
82            })
83        };
84        let backup_coordinator =
85            BackupCoordinator::new(data_dir.clone(), lifecycle_state.clone(), checkpoint);
86        let restore_coordinator = RestoreCoordinator::new(data_dir, lifecycle_state.clone());
87
88        Ok(Self {
89            state: Arc::new(ServerState {
90                config,
91                store,
92                catalog,
93                async_store,
94                session_manager,
95                metrics,
96                audit,
97                auth,
98                start_time: Instant::now(),
99                lifecycle_state,
100                recovery_info,
101                backup_coordinator,
102                restore_coordinator,
103            }),
104        })
105    }
106
107    pub async fn run(self) -> Result<()> {
108        if self.state.config.tracing_enabled {
109            init_tracing();
110        }
111
112        let (shutdown_tx, _) = broadcast::channel(2);
113        let http_state = self.state.clone();
114        let admin_state = self.state.clone();
115        let grpc_state = self.state.clone();
116        let cleanup_state = self.state.clone();
117        let http_shutdown = shutdown_tx.subscribe();
118        let admin_shutdown = shutdown_tx.subscribe();
119        let grpc_shutdown = shutdown_tx.subscribe();
120        let cleanup_shutdown = shutdown_tx.subscribe();
121
122        let tls_config = if let Some(tls) = &self.state.config.tls {
123            let config = tls::build_rustls_config(tls)?;
124            Some(axum_server::tls_rustls::RustlsConfig::from_config(config))
125        } else {
126            None
127        };
128
129        let http_task = tokio::spawn(run_http(http_state, tls_config.clone(), http_shutdown));
130        let admin_task = tokio::spawn(run_admin(admin_state, tls_config, admin_shutdown));
131        let grpc_task = tokio::spawn(run_grpc(grpc_state, grpc_shutdown));
132        let cleanup_task = tokio::spawn(run_cleanup(cleanup_state, cleanup_shutdown));
133
134        wait_for_shutdown(shutdown_tx.clone()).await;
135        let _ = shutdown_tx.send(());
136
137        http_task
138            .await
139            .map_err(|err| ServerError::Internal(err.to_string()))??;
140        admin_task
141            .await
142            .map_err(|err| ServerError::Internal(err.to_string()))??;
143        grpc_task
144            .await
145            .map_err(|err| ServerError::Internal(err.to_string()))??;
146        cleanup_task
147            .await
148            .map_err(|err| ServerError::Internal(err.to_string()))??;
149
150        self.state.audit.flush()?;
151        Ok(())
152    }
153}
154
155impl ServerState {
156    pub async fn begin_sql_txn(
157        &self,
158    ) -> Result<AsyncTxnBridge<'static, AsyncKVTransactionAdapter>> {
159        let txn = self.async_store.begin_async().await?;
160        let mut bridge =
161            AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, self.catalog.clone());
162        let policy = MemoryControlPolicy::from_env_with_metrics(self.metrics.clone()).sql_policy();
163        bridge.set_memory_policy(policy);
164        Ok(bridge)
165    }
166}
167
168fn build_txn_factory(
169    store: Arc<AsyncKVStoreAdapter<AnyKV>>,
170    catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
171    metrics: Metrics,
172) -> TransactionFactory {
173    Arc::new(move || {
174        let store = store.clone();
175        let catalog = catalog.clone();
176        let metrics = metrics.clone();
177        Box::pin(async move {
178            let txn = store.begin_async().await?;
179            let mut bridge: AsyncTxnBridge<'static, AsyncKVTransactionAdapter> =
180                AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, catalog);
181            let policy = MemoryControlPolicy::from_env_with_metrics(metrics).sql_policy();
182            bridge.set_memory_policy(policy);
183            Ok(Box::new(bridge) as Box<dyn ErasedAsyncSqlTransaction>)
184        })
185    })
186}
187
188fn load_catalog(store: Arc<AnyKV>) -> Result<Arc<RwLock<dyn Catalog + Send + Sync>>> {
189    let catalog = match PersistentCatalog::load(store.clone()) {
190        Ok(catalog) => catalog,
191        Err(CatalogError::Kv(alopex_core::Error::NotFound)) => PersistentCatalog::new(store),
192        Err(err) => return Err(ServerError::Catalog(err)),
193    };
194    let catalog: Arc<RwLock<dyn Catalog + Send + Sync>> = Arc::new(RwLock::new(catalog));
195    Ok(catalog)
196}
197
198async fn run_http(
199    state: Arc<ServerState>,
200    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
201    mut shutdown: broadcast::Receiver<()>,
202) -> Result<()> {
203    let app = crate::http::router(state.clone());
204    let addr = state.config.http_bind;
205
206    if let Some(tls) = tls_config {
207        let handle = axum_server::Handle::new();
208        let shutdown_handle = handle.clone();
209        tokio::spawn(async move {
210            let _ = shutdown.recv().await;
211            shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
212        });
213        axum_server::bind_rustls(addr, tls)
214            .handle(handle)
215            .serve(app.into_make_service())
216            .await
217            .map_err(|err| ServerError::Internal(err.to_string()))?;
218    } else {
219        let shutdown_signal = async move {
220            let _ = shutdown.recv().await;
221        };
222        axum::Server::bind(&addr)
223            .serve(app.into_make_service())
224            .with_graceful_shutdown(shutdown_signal)
225            .await
226            .map_err(|err| ServerError::Internal(err.to_string()))?;
227    }
228    Ok(())
229}
230
231async fn run_admin(
232    state: Arc<ServerState>,
233    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
234    mut shutdown: broadcast::Receiver<()>,
235) -> Result<()> {
236    let app = crate::http::admin_router(state.clone());
237    let addr = state.config.admin_bind;
238
239    if let Some(tls) = tls_config {
240        let handle = axum_server::Handle::new();
241        let shutdown_handle = handle.clone();
242        tokio::spawn(async move {
243            let _ = shutdown.recv().await;
244            shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
245        });
246        axum_server::bind_rustls(addr, tls)
247            .handle(handle)
248            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
249            .await
250            .map_err(|err| ServerError::Internal(err.to_string()))?;
251    } else {
252        let shutdown_signal = async move {
253            let _ = shutdown.recv().await;
254        };
255        axum::Server::bind(&addr)
256            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
257            .with_graceful_shutdown(shutdown_signal)
258            .await
259            .map_err(|err| ServerError::Internal(err.to_string()))?;
260    }
261    Ok(())
262}
263
264async fn run_grpc(state: Arc<ServerState>, shutdown: broadcast::Receiver<()>) -> Result<()> {
265    let addr = state.config.grpc_bind;
266    crate::grpc::serve(state, addr, shutdown).await
267}
268
269async fn run_cleanup(state: Arc<ServerState>, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
270    let mut interval = tokio::time::interval(state.config.session_ttl);
271    loop {
272        tokio::select! {
273            _ = interval.tick() => {
274                state.session_manager.cleanup_expired();
275            }
276            _ = shutdown.recv() => break,
277        }
278    }
279    Ok(())
280}
281
282async fn wait_for_shutdown(signal: broadcast::Sender<()>) {
283    #[cfg(unix)]
284    let mut term = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
285        Ok(signal) => signal,
286        Err(_) => {
287            let _ = tokio::signal::ctrl_c().await;
288            let _ = signal.send(());
289            return;
290        }
291    };
292
293    #[cfg(unix)]
294    tokio::select! {
295        _ = tokio::signal::ctrl_c() => {}
296        _ = term.recv() => {}
297    }
298
299    #[cfg(not(unix))]
300    {
301        let _ = tokio::signal::ctrl_c().await;
302    }
303
304    let _ = signal.send(());
305}
306
307fn init_tracing() {
308    let _ = tracing_subscriber::fmt()
309        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
310        .try_init();
311}