1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3use std::time::Instant;
4
5use alopex_core::kv::any::AnyKV;
6use alopex_core::kv::async_adapter::{AsyncKVStoreAdapter, AsyncKVTransactionAdapter};
7use alopex_core::kv::AsyncKVStore;
8use alopex_core::types::TxnMode;
9use alopex_sql::catalog::{Catalog, CatalogError, PersistentCatalog};
10use alopex_sql::storage::async_storage::AsyncTxnBridge;
11use alopex_sql::storage::erased::ErasedAsyncSqlTransaction;
12use tokio::sync::broadcast;
13
14use crate::audit::AuditLogger;
15use crate::auth::AuthMiddleware;
16use crate::config::ServerConfig;
17use crate::error::{Result, ServerError};
18use crate::metrics::Metrics;
19use crate::ops::backup::BackupCoordinator;
20use crate::ops::memory::MemoryControlPolicy;
21use crate::ops::recovery::{RecoveryCoordinator, RecoveryInfo};
22use crate::ops::restore::RestoreCoordinator;
23use crate::ops::state::{LifecycleStateManager, Mode};
24use crate::session::{SessionConfig, SessionManager, TransactionFactory};
25use crate::tls;
26
27pub struct Server {
28 pub state: Arc<ServerState>,
29}
30
31pub struct ServerState {
32 pub config: ServerConfig,
33 pub store: Arc<AnyKV>,
34 pub catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
35 pub async_store: Arc<AsyncKVStoreAdapter<AnyKV>>,
36 pub session_manager: Arc<SessionManager>,
37 pub metrics: Metrics,
38 pub audit: AuditLogger,
39 pub auth: AuthMiddleware,
40 pub start_time: Instant,
41 pub lifecycle_state: Arc<LifecycleStateManager>,
42 pub recovery_info: RecoveryInfo,
43 pub backup_coordinator: BackupCoordinator,
44 pub restore_coordinator: RestoreCoordinator,
45}
46
47impl Server {
48 pub fn new(config: ServerConfig) -> Result<Self> {
49 config.validate()?;
50 let (store, recovery_info) = RecoveryCoordinator::open_store(&config.data_dir)?;
51 let lifecycle_state = Arc::new(LifecycleStateManager::new(Mode::Normal));
52 RecoveryCoordinator::apply_initial_mode(&lifecycle_state, &recovery_info);
53
54 let store = Arc::new(store);
55 let catalog = load_catalog(store.clone())?;
56 let async_store = Arc::new(AsyncKVStoreAdapter::from_arc(
57 store.clone(),
58 TxnMode::ReadWrite,
59 ));
60 let metrics = Metrics::new()?;
61 let audit = AuditLogger::new(config.audit_log_output.clone())?;
62 let auth = AuthMiddleware::new(config.auth_mode.clone());
63
64 let txn_factory = build_txn_factory(async_store.clone(), catalog.clone(), metrics.clone());
65 let session_manager = Arc::new(SessionManager::new(
66 SessionConfig {
67 ttl: config.session_ttl,
68 },
69 txn_factory,
70 ));
71 let data_dir = config.data_dir.clone();
72 let checkpoint = {
73 let store = store.clone();
74 Arc::new(move || match store.as_ref() {
75 AnyKV::Lsm(kv) => {
76 kv.checkpoint()?;
77 Ok(())
78 }
79 _ => Err(ServerError::BadRequest(
80 "checkpoint unsupported for current storage engine".to_string(),
81 )),
82 })
83 };
84 let backup_coordinator =
85 BackupCoordinator::new(data_dir.clone(), lifecycle_state.clone(), checkpoint);
86 let restore_coordinator = RestoreCoordinator::new(data_dir, lifecycle_state.clone());
87
88 Ok(Self {
89 state: Arc::new(ServerState {
90 config,
91 store,
92 catalog,
93 async_store,
94 session_manager,
95 metrics,
96 audit,
97 auth,
98 start_time: Instant::now(),
99 lifecycle_state,
100 recovery_info,
101 backup_coordinator,
102 restore_coordinator,
103 }),
104 })
105 }
106
107 pub async fn run(self) -> Result<()> {
108 if self.state.config.tracing_enabled {
109 init_tracing();
110 }
111
112 let (shutdown_tx, _) = broadcast::channel(2);
113 let http_state = self.state.clone();
114 let admin_state = self.state.clone();
115 let grpc_state = self.state.clone();
116 let cleanup_state = self.state.clone();
117 let http_shutdown = shutdown_tx.subscribe();
118 let admin_shutdown = shutdown_tx.subscribe();
119 let grpc_shutdown = shutdown_tx.subscribe();
120 let cleanup_shutdown = shutdown_tx.subscribe();
121
122 let tls_config = if let Some(tls) = &self.state.config.tls {
123 let config = tls::build_rustls_config(tls)?;
124 Some(axum_server::tls_rustls::RustlsConfig::from_config(config))
125 } else {
126 None
127 };
128
129 let http_task = tokio::spawn(run_http(http_state, tls_config.clone(), http_shutdown));
130 let admin_task = tokio::spawn(run_admin(admin_state, tls_config, admin_shutdown));
131 let grpc_task = tokio::spawn(run_grpc(grpc_state, grpc_shutdown));
132 let cleanup_task = tokio::spawn(run_cleanup(cleanup_state, cleanup_shutdown));
133
134 wait_for_shutdown(shutdown_tx.clone()).await;
135 let _ = shutdown_tx.send(());
136
137 http_task
138 .await
139 .map_err(|err| ServerError::Internal(err.to_string()))??;
140 admin_task
141 .await
142 .map_err(|err| ServerError::Internal(err.to_string()))??;
143 grpc_task
144 .await
145 .map_err(|err| ServerError::Internal(err.to_string()))??;
146 cleanup_task
147 .await
148 .map_err(|err| ServerError::Internal(err.to_string()))??;
149
150 self.state.audit.flush()?;
151 Ok(())
152 }
153}
154
155impl ServerState {
156 pub async fn begin_sql_txn(
157 &self,
158 ) -> Result<AsyncTxnBridge<'static, AsyncKVTransactionAdapter>> {
159 let txn = self.async_store.begin_async().await?;
160 let mut bridge =
161 AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, self.catalog.clone());
162 let policy = MemoryControlPolicy::from_env_with_metrics(self.metrics.clone()).sql_policy();
163 bridge.set_memory_policy(policy);
164 Ok(bridge)
165 }
166}
167
168fn build_txn_factory(
169 store: Arc<AsyncKVStoreAdapter<AnyKV>>,
170 catalog: Arc<RwLock<dyn Catalog + Send + Sync>>,
171 metrics: Metrics,
172) -> TransactionFactory {
173 Arc::new(move || {
174 let store = store.clone();
175 let catalog = catalog.clone();
176 let metrics = metrics.clone();
177 Box::pin(async move {
178 let txn = store.begin_async().await?;
179 let mut bridge: AsyncTxnBridge<'static, AsyncKVTransactionAdapter> =
180 AsyncTxnBridge::with_catalog(txn, TxnMode::ReadWrite, catalog);
181 let policy = MemoryControlPolicy::from_env_with_metrics(metrics).sql_policy();
182 bridge.set_memory_policy(policy);
183 Ok(Box::new(bridge) as Box<dyn ErasedAsyncSqlTransaction>)
184 })
185 })
186}
187
188fn load_catalog(store: Arc<AnyKV>) -> Result<Arc<RwLock<dyn Catalog + Send + Sync>>> {
189 let catalog = match PersistentCatalog::load(store.clone()) {
190 Ok(catalog) => catalog,
191 Err(CatalogError::Kv(alopex_core::Error::NotFound)) => PersistentCatalog::new(store),
192 Err(err) => return Err(ServerError::Catalog(err)),
193 };
194 let catalog: Arc<RwLock<dyn Catalog + Send + Sync>> = Arc::new(RwLock::new(catalog));
195 Ok(catalog)
196}
197
198async fn run_http(
199 state: Arc<ServerState>,
200 tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
201 mut shutdown: broadcast::Receiver<()>,
202) -> Result<()> {
203 let app = crate::http::router(state.clone());
204 let addr = state.config.http_bind;
205
206 if let Some(tls) = tls_config {
207 let handle = axum_server::Handle::new();
208 let shutdown_handle = handle.clone();
209 tokio::spawn(async move {
210 let _ = shutdown.recv().await;
211 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
212 });
213 axum_server::bind_rustls(addr, tls)
214 .handle(handle)
215 .serve(app.into_make_service())
216 .await
217 .map_err(|err| ServerError::Internal(err.to_string()))?;
218 } else {
219 let shutdown_signal = async move {
220 let _ = shutdown.recv().await;
221 };
222 axum::Server::bind(&addr)
223 .serve(app.into_make_service())
224 .with_graceful_shutdown(shutdown_signal)
225 .await
226 .map_err(|err| ServerError::Internal(err.to_string()))?;
227 }
228 Ok(())
229}
230
231async fn run_admin(
232 state: Arc<ServerState>,
233 tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
234 mut shutdown: broadcast::Receiver<()>,
235) -> Result<()> {
236 let app = crate::http::admin_router(state.clone());
237 let addr = state.config.admin_bind;
238
239 if let Some(tls) = tls_config {
240 let handle = axum_server::Handle::new();
241 let shutdown_handle = handle.clone();
242 tokio::spawn(async move {
243 let _ = shutdown.recv().await;
244 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
245 });
246 axum_server::bind_rustls(addr, tls)
247 .handle(handle)
248 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
249 .await
250 .map_err(|err| ServerError::Internal(err.to_string()))?;
251 } else {
252 let shutdown_signal = async move {
253 let _ = shutdown.recv().await;
254 };
255 axum::Server::bind(&addr)
256 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
257 .with_graceful_shutdown(shutdown_signal)
258 .await
259 .map_err(|err| ServerError::Internal(err.to_string()))?;
260 }
261 Ok(())
262}
263
264async fn run_grpc(state: Arc<ServerState>, shutdown: broadcast::Receiver<()>) -> Result<()> {
265 let addr = state.config.grpc_bind;
266 crate::grpc::serve(state, addr, shutdown).await
267}
268
269async fn run_cleanup(state: Arc<ServerState>, mut shutdown: broadcast::Receiver<()>) -> Result<()> {
270 let mut interval = tokio::time::interval(state.config.session_ttl);
271 loop {
272 tokio::select! {
273 _ = interval.tick() => {
274 state.session_manager.cleanup_expired();
275 }
276 _ = shutdown.recv() => break,
277 }
278 }
279 Ok(())
280}
281
282async fn wait_for_shutdown(signal: broadcast::Sender<()>) {
283 #[cfg(unix)]
284 let mut term = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
285 Ok(signal) => signal,
286 Err(_) => {
287 let _ = tokio::signal::ctrl_c().await;
288 let _ = signal.send(());
289 return;
290 }
291 };
292
293 #[cfg(unix)]
294 tokio::select! {
295 _ = tokio::signal::ctrl_c() => {}
296 _ = term.recv() => {}
297 }
298
299 #[cfg(not(unix))]
300 {
301 let _ = tokio::signal::ctrl_c().await;
302 }
303
304 let _ = signal.send(());
305}
306
307fn init_tracing() {
308 let _ = tracing_subscriber::fmt()
309 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
310 .try_init();
311}