1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3
4use alopex_core::kv::any::AnyKV;
5use alopex_core::kv::async_adapter::{AsyncKVStoreAdapter, AsyncKVTransactionAdapter};
6use alopex_core::kv::storage::{StorageFactory, StorageMode};
7use alopex_core::kv::AsyncKVStore;
8use alopex_core::types::TxnMode;
9use alopex_sql::catalog::{Catalog, CatalogError, PersistentCatalog};
10use alopex_sql::storage::async_storage::AsyncTxnBridge;
11use alopex_sql::storage::erased::ErasedAsyncSqlTransaction;
12use tokio::sync::broadcast;
13
14use crate::audit::AuditLogger;
15use crate::auth::AuthMiddleware;
16use crate::config::ServerConfig;
17use crate::error::{Result, ServerError};
18use crate::metrics::Metrics;
19use crate::session::{SessionConfig, SessionManager, TransactionFactory};
20use crate::tls;
21
22pub struct Server {
23 pub state: Arc<ServerState>,
24}
25
26pub struct ServerState {
27 pub config: ServerConfig,
28 pub store: Arc<AnyKV>,
29 pub catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
30 pub async_store: Arc<AsyncKVStoreAdapter<AnyKV>>,
31 pub session_manager: Arc<SessionManager>,
32 pub metrics: Metrics,
33 pub audit: AuditLogger,
34 pub auth: AuthMiddleware,
35}
36
37impl Server {
38 pub fn new(config: ServerConfig) -> Result<Self> {
39 config.validate()?;
40 let store = StorageFactory::create(StorageMode::Disk {
41 path: config.data_dir.clone(),
42 config: None,
43 })?;
44 let store = Arc::new(store);
45 let catalog = load_catalog(store.clone())?;
46 let async_store = Arc::new(AsyncKVStoreAdapter::from_arc(
47 store.clone(),
48 TxnMode::ReadWrite,
49 ));
50 let metrics = Metrics::new()?;
51 let audit = AuditLogger::new(config.audit_log_output.clone())?;
52 let auth = AuthMiddleware::new(config.auth_mode.clone());
53
54 let txn_factory = build_txn_factory(async_store.clone(), catalog.clone());
55 let session_manager = Arc::new(SessionManager::new(
56 SessionConfig {
57 ttl: config.session_ttl,
58 },
59 txn_factory,
60 ));
61
62 Ok(Self {
63 state: Arc::new(ServerState {
64 config,
65 store,
66 catalog,
67 async_store,
68 session_manager,
69 metrics,
70 audit,
71 auth,
72 }),
73 })
74 }
75
76 pub async fn run(self) -> Result<()> {
77 if self.state.config.tracing_enabled {
78 init_tracing();
79 }
80
81 let (shutdown_tx, _) = broadcast::channel(2);
82 let http_state = self.state.clone();
83 let admin_state = self.state.clone();
84 let grpc_state = self.state.clone();
85 let cleanup_state = self.state.clone();
86 let http_shutdown = shutdown_tx.subscribe();
87 let admin_shutdown = shutdown_tx.subscribe();
88 let grpc_shutdown = shutdown_tx.subscribe();
89 let cleanup_shutdown = shutdown_tx.subscribe();
90
91 let tls_config = if let Some(tls) = &self.state.config.tls {
92 let config = tls::build_rustls_config(tls)?;
93 Some(axum_server::tls_rustls::RustlsConfig::from_config(config))
94 } else {
95 None
96 };
97
98 let http_task = tokio::spawn(run_http(http_state, tls_config.clone(), http_shutdown));
99 let admin_task = tokio::spawn(run_admin(admin_state, tls_config, admin_shutdown));
100 let grpc_task = tokio::spawn(run_grpc(grpc_state, grpc_shutdown));
101 let cleanup_task = tokio::spawn(run_cleanup(cleanup_state, cleanup_shutdown));
102
103 wait_for_shutdown(shutdown_tx.clone()).await;
104 let _ = shutdown_tx.send(());
105
106 http_task
107 .await
108 .map_err(|err| ServerError::Internal(err.to_string()))??;
109 admin_task
110 .await
111 .map_err(|err| ServerError::Internal(err.to_string()))??;
112 grpc_task
113 .await
114 .map_err(|err| ServerError::Internal(err.to_string()))??;
115 cleanup_task
116 .await
117 .map_err(|err| ServerError::Internal(err.to_string()))??;
118
119 self.state.audit.flush()?;
120 Ok(())
121 }
122}
123
124impl ServerState {
125 pub async fn begin_sql_txn(
126 &self,
127 ) -> Result<AsyncTxnBridge<'static, AsyncKVTransactionAdapter>> {
128 let txn = self.async_store.begin_async().await?;
129 Ok(AsyncTxnBridge::with_catalog(
130 txn,
131 TxnMode::ReadWrite,
132 self.catalog.clone(),
133 ))
134 }
135}
136
137fn build_txn_factory(
138 store: Arc<AsyncKVStoreAdapter<AnyKV>>,
139 catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
140) -> TransactionFactory {
141 Arc::new(move || {
142 let store = store.clone();
143 let catalog = catalog.clone();
144 Box::pin(async move {
145 let txn = store.begin_async().await?;
146 let bridge: AsyncTxnBridge<'static, AsyncKVTransactionAdapter> =
147 AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, catalog);
148 Ok(Box::new(bridge) as Box<dyn ErasedAsyncSqlTransaction>)
149 })
150 })
151}
152
153fn load_catalog(store: Arc<AnyKV>) -> Result<Arc<RwLock<dyn Catalog + Send + Sync>>> {
154 let catalog = match PersistentCatalog::load(store.clone()) {
155 Ok(catalog) => catalog,
156 Err(CatalogError::Kv(alopex_core::Error::NotFound)) => PersistentCatalog::new(store),
157 Err(err) => return Err(ServerError::Catalog(err)),
158 };
159 let catalog: Arc<RwLock<dyn Catalog + Send + Sync>> = Arc::new(RwLock::new(catalog));
160 Ok(catalog)
161}
162
163async fn run_http(
164 state: Arc<ServerState>,
165 tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
166 mut shutdown: broadcast::Receiver<()>,
167) -> Result<()> {
168 let app = crate::http::router(state.clone());
169 let addr = state.config.http_bind;
170
171 if let Some(tls) = tls_config {
172 let handle = axum_server::Handle::new();
173 let shutdown_handle = handle.clone();
174 tokio::spawn(async move {
175 let _ = shutdown.recv().await;
176 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
177 });
178 axum_server::bind_rustls(addr, tls)
179 .handle(handle)
180 .serve(app.into_make_service())
181 .await
182 .map_err(|err| ServerError::Internal(err.to_string()))?;
183 } else {
184 let shutdown_signal = async move {
185 let _ = shutdown.recv().await;
186 };
187 axum::Server::bind(&addr)
188 .serve(app.into_make_service())
189 .with_graceful_shutdown(shutdown_signal)
190 .await
191 .map_err(|err| ServerError::Internal(err.to_string()))?;
192 }
193 Ok(())
194}
195
196async fn run_admin(
197 state: Arc<ServerState>,
198 tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
199 mut shutdown: broadcast::Receiver<()>,
200) -> Result<()> {
201 let app = crate::http::admin_router(state.clone());
202 let addr = state.config.admin_bind;
203
204 if let Some(tls) = tls_config {
205 let handle = axum_server::Handle::new();
206 let shutdown_handle = handle.clone();
207 tokio::spawn(async move {
208 let _ = shutdown.recv().await;
209 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
210 });
211 axum_server::bind_rustls(addr, tls)
212 .handle(handle)
213 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
214 .await
215 .map_err(|err| ServerError::Internal(err.to_string()))?;
216 } else {
217 let shutdown_signal = async move {
218 let _ = shutdown.recv().await;
219 };
220 axum::Server::bind(&addr)
221 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
222 .with_graceful_shutdown(shutdown_signal)
223 .await
224 .map_err(|err| ServerError::Internal(err.to_string()))?;
225 }
226 Ok(())
227}
228
229async fn run_grpc(state: Arc<ServerState>, shutdown: broadcast::Receiver<()>) -> Result<()> {
230 let addr = state.config.grpc_bind;
231 crate::grpc::serve(state, addr, shutdown).await
232}
233
234async fn run_cleanup(state: Arc<ServerState>, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
235 let mut interval = tokio::time::interval(state.config.session_ttl);
236 loop {
237 tokio::select! {
238 _ = interval.tick() => {
239 state.session_manager.cleanup_expired();
240 }
241 _ = shutdown.recv() => break,
242 }
243 }
244 Ok(())
245}
246
247async fn wait_for_shutdown(signal: broadcast::Sender<()>) {
248 #[cfg(unix)]
249 let mut term = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
250 Ok(signal) => signal,
251 Err(_) => {
252 let _ = tokio::signal::ctrl_c().await;
253 let _ = signal.send(());
254 return;
255 }
256 };
257
258 #[cfg(unix)]
259 tokio::select! {
260 _ = tokio::signal::ctrl_c() => {}
261 _ = term.recv() => {}
262 }
263
264 #[cfg(not(unix))]
265 {
266 let _ = tokio::signal::ctrl_c().await;
267 }
268
269 let _ = signal.send(());
270}
271
272fn init_tracing() {
273 let _ = tracing_subscriber::fmt()
274 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
275 .try_init();
276}