1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3use std::time::Instant;
4
5use alopex_core::kv::any::AnyKV;
6use alopex_core::kv::async_adapter::{AsyncKVStoreAdapter, AsyncKVTransactionAdapter};
7use alopex_core::kv::storage::{StorageFactory, StorageMode};
8use alopex_core::kv::AsyncKVStore;
9use alopex_core::types::TxnMode;
10use alopex_sql::catalog::{Catalog, CatalogError, PersistentCatalog};
11use alopex_sql::storage::async_storage::AsyncTxnBridge;
12use alopex_sql::storage::erased::ErasedAsyncSqlTransaction;
13use tokio::sync::broadcast;
14
15use crate::audit::AuditLogger;
16use crate::auth::AuthMiddleware;
17use crate::config::ServerConfig;
18use crate::error::{Result, ServerError};
19use crate::metrics::Metrics;
20use crate::session::{SessionConfig, SessionManager, TransactionFactory};
21use crate::tls;
22
23pub struct Server {
24 pub state: Arc<ServerState>,
25}
26
27pub struct ServerState {
28 pub config: ServerConfig,
29 pub store: Arc<AnyKV>,
30 pub catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
31 pub async_store: Arc<AsyncKVStoreAdapter<AnyKV>>,
32 pub session_manager: Arc<SessionManager>,
33 pub metrics: Metrics,
34 pub audit: AuditLogger,
35 pub auth: AuthMiddleware,
36 pub start_time: Instant,
37}
38
39impl Server {
40 pub fn new(config: ServerConfig) -> Result<Self> {
41 config.validate()?;
42 let store = StorageFactory::create(StorageMode::Disk {
43 path: config.data_dir.clone(),
44 config: None,
45 })?;
46 let store = Arc::new(store);
47 let catalog = load_catalog(store.clone())?;
48 let async_store = Arc::new(AsyncKVStoreAdapter::from_arc(
49 store.clone(),
50 TxnMode::ReadWrite,
51 ));
52 let metrics = Metrics::new()?;
53 let audit = AuditLogger::new(config.audit_log_output.clone())?;
54 let auth = AuthMiddleware::new(config.auth_mode.clone());
55
56 let txn_factory = build_txn_factory(async_store.clone(), catalog.clone());
57 let session_manager = Arc::new(SessionManager::new(
58 SessionConfig {
59 ttl: config.session_ttl,
60 },
61 txn_factory,
62 ));
63
64 Ok(Self {
65 state: Arc::new(ServerState {
66 config,
67 store,
68 catalog,
69 async_store,
70 session_manager,
71 metrics,
72 audit,
73 auth,
74 start_time: Instant::now(),
75 }),
76 })
77 }
78
79 pub async fn run(self) -> Result<()> {
80 if self.state.config.tracing_enabled {
81 init_tracing();
82 }
83
84 let (shutdown_tx, _) = broadcast::channel(2);
85 let http_state = self.state.clone();
86 let admin_state = self.state.clone();
87 let grpc_state = self.state.clone();
88 let cleanup_state = self.state.clone();
89 let http_shutdown = shutdown_tx.subscribe();
90 let admin_shutdown = shutdown_tx.subscribe();
91 let grpc_shutdown = shutdown_tx.subscribe();
92 let cleanup_shutdown = shutdown_tx.subscribe();
93
94 let tls_config = if let Some(tls) = &self.state.config.tls {
95 let config = tls::build_rustls_config(tls)?;
96 Some(axum_server::tls_rustls::RustlsConfig::from_config(config))
97 } else {
98 None
99 };
100
101 let http_task = tokio::spawn(run_http(http_state, tls_config.clone(), http_shutdown));
102 let admin_task = tokio::spawn(run_admin(admin_state, tls_config, admin_shutdown));
103 let grpc_task = tokio::spawn(run_grpc(grpc_state, grpc_shutdown));
104 let cleanup_task = tokio::spawn(run_cleanup(cleanup_state, cleanup_shutdown));
105
106 wait_for_shutdown(shutdown_tx.clone()).await;
107 let _ = shutdown_tx.send(());
108
109 http_task
110 .await
111 .map_err(|err| ServerError::Internal(err.to_string()))??;
112 admin_task
113 .await
114 .map_err(|err| ServerError::Internal(err.to_string()))??;
115 grpc_task
116 .await
117 .map_err(|err| ServerError::Internal(err.to_string()))??;
118 cleanup_task
119 .await
120 .map_err(|err| ServerError::Internal(err.to_string()))??;
121
122 self.state.audit.flush()?;
123 Ok(())
124 }
125}
126
127impl ServerState {
128 pub async fn begin_sql_txn(
129 &self,
130 ) -> Result<AsyncTxnBridge<'static, AsyncKVTransactionAdapter>> {
131 let txn = self.async_store.begin_async().await?;
132 Ok(AsyncTxnBridge::with_catalog(
133 txn,
134 TxnMode::ReadWrite,
135 self.catalog.clone(),
136 ))
137 }
138}
139
140fn build_txn_factory(
141 store: Arc<AsyncKVStoreAdapter<AnyKV>>,
142 catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
143) -> TransactionFactory {
144 Arc::new(move || {
145 let store = store.clone();
146 let catalog = catalog.clone();
147 Box::pin(async move {
148 let txn = store.begin_async().await?;
149 let bridge: AsyncTxnBridge<'static, AsyncKVTransactionAdapter> =
150 AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, catalog);
151 Ok(Box::new(bridge) as Box<dyn ErasedAsyncSqlTransaction>)
152 })
153 })
154}
155
156fn load_catalog(store: Arc<AnyKV>) -> Result<Arc<RwLock<dyn Catalog + Send + Sync>>> {
157 let catalog = match PersistentCatalog::load(store.clone()) {
158 Ok(catalog) => catalog,
159 Err(CatalogError::Kv(alopex_core::Error::NotFound)) => PersistentCatalog::new(store),
160 Err(err) => return Err(ServerError::Catalog(err)),
161 };
162 let catalog: Arc<RwLock<dyn Catalog + Send + Sync>> = Arc::new(RwLock::new(catalog));
163 Ok(catalog)
164}
165
166async fn run_http(
167 state: Arc<ServerState>,
168 tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
169 mut shutdown: broadcast::Receiver<()>,
170) -> Result<()> {
171 let app = crate::http::router(state.clone());
172 let addr = state.config.http_bind;
173
174 if let Some(tls) = tls_config {
175 let handle = axum_server::Handle::new();
176 let shutdown_handle = handle.clone();
177 tokio::spawn(async move {
178 let _ = shutdown.recv().await;
179 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
180 });
181 axum_server::bind_rustls(addr, tls)
182 .handle(handle)
183 .serve(app.into_make_service())
184 .await
185 .map_err(|err| ServerError::Internal(err.to_string()))?;
186 } else {
187 let shutdown_signal = async move {
188 let _ = shutdown.recv().await;
189 };
190 axum::Server::bind(&addr)
191 .serve(app.into_make_service())
192 .with_graceful_shutdown(shutdown_signal)
193 .await
194 .map_err(|err| ServerError::Internal(err.to_string()))?;
195 }
196 Ok(())
197}
198
199async fn run_admin(
200 state: Arc<ServerState>,
201 tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
202 mut shutdown: broadcast::Receiver<()>,
203) -> Result<()> {
204 let app = crate::http::admin_router(state.clone());
205 let addr = state.config.admin_bind;
206
207 if let Some(tls) = tls_config {
208 let handle = axum_server::Handle::new();
209 let shutdown_handle = handle.clone();
210 tokio::spawn(async move {
211 let _ = shutdown.recv().await;
212 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
213 });
214 axum_server::bind_rustls(addr, tls)
215 .handle(handle)
216 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
217 .await
218 .map_err(|err| ServerError::Internal(err.to_string()))?;
219 } else {
220 let shutdown_signal = async move {
221 let _ = shutdown.recv().await;
222 };
223 axum::Server::bind(&addr)
224 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
225 .with_graceful_shutdown(shutdown_signal)
226 .await
227 .map_err(|err| ServerError::Internal(err.to_string()))?;
228 }
229 Ok(())
230}
231
232async fn run_grpc(state: Arc<ServerState>, shutdown: broadcast::Receiver<()>) -> Result<()> {
233 let addr = state.config.grpc_bind;
234 crate::grpc::serve(state, addr, shutdown).await
235}
236
237async fn run_cleanup(state: Arc<ServerState>, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
238 let mut interval = tokio::time::interval(state.config.session_ttl);
239 loop {
240 tokio::select! {
241 _ = interval.tick() => {
242 state.session_manager.cleanup_expired();
243 }
244 _ = shutdown.recv() => break,
245 }
246 }
247 Ok(())
248}
249
250async fn wait_for_shutdown(signal: broadcast::Sender<()>) {
251 #[cfg(unix)]
252 let mut term = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
253 Ok(signal) => signal,
254 Err(_) => {
255 let _ = tokio::signal::ctrl_c().await;
256 let _ = signal.send(());
257 return;
258 }
259 };
260
261 #[cfg(unix)]
262 tokio::select! {
263 _ = tokio::signal::ctrl_c() => {}
264 _ = term.recv() => {}
265 }
266
267 #[cfg(not(unix))]
268 {
269 let _ = tokio::signal::ctrl_c().await;
270 }
271
272 let _ = signal.send(());
273}
274
275fn init_tracing() {
276 let _ = tracing_subscriber::fmt()
277 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
278 .try_init();
279}