1pub mod grpc;
2pub mod http;
3pub mod state;
4
5use std::{net::SocketAddr, sync::Arc, time::Duration};
6
7use anyhow::{Context, Result};
8use clawdb::{ClawDB, ClawDBConfig};
9use tokio::{net::TcpListener, task::JoinHandle, time::timeout};
10use tokio_util::sync::CancellationToken;
11
12use crate::state::AppState;
13
14pub const VERSION_TEXT: &str = concat!(
15 "clawdb-server ",
16 env!("CARGO_PKG_VERSION"),
17 "\ncomponents: claw-core/0.1.2 claw-vector/0.1.2 claw-branch/0.1.2\n claw-sync/0.1.2 claw-guard/0.1.2 claw-reflect-client/0.1.2"
18);
19
20#[derive(Clone, Debug)]
21pub struct ServerOptions {
22 pub grpc_addr: SocketAddr,
23 pub http_addr: SocketAddr,
24 pub metrics_addr: SocketAddr,
25}
26
27#[derive(Clone, Debug)]
28pub struct BoundAddresses {
29 pub grpc: SocketAddr,
30 pub http: SocketAddr,
31 pub metrics: SocketAddr,
32}
33
34pub struct RunningServers {
35 pub addresses: BoundAddresses,
36 pub shutdown: CancellationToken,
37 pub db: Arc<ClawDB>,
38 pub grpc_task: JoinHandle<Result<()>>,
39 pub http_task: JoinHandle<Result<()>>,
40 pub metrics_task: JoinHandle<Result<()>>,
41}
42
43impl RunningServers {
44 pub async fn shutdown(self, grace: Duration) -> Result<()> {
45 self.shutdown.cancel();
46
47 let joined = timeout(grace, async {
48 let grpc = self.grpc_task.await.context("gRPC task join failed")?;
49 let http = self.http_task.await.context("HTTP task join failed")?;
50 let metrics = self
51 .metrics_task
52 .await
53 .context("metrics task join failed")?;
54 grpc?;
55 http?;
56 metrics?;
57 Result::<()>::Ok(())
58 })
59 .await;
60
61 self.db.close().await.context("failed to close clawdb")?;
62 joined.context("timed out waiting for server tasks")??;
63 Ok(())
64 }
65}
66
67pub async fn build_state(config: ClawDBConfig) -> Result<Arc<AppState>> {
68 let db = Arc::new(
69 ClawDB::new(config)
70 .await
71 .context("failed to initialize ClawDB")?,
72 );
73 Ok(Arc::new(AppState::new(db)))
74}
75
76pub async fn spawn_servers(state: Arc<AppState>, options: ServerOptions) -> Result<RunningServers> {
77 let shutdown = CancellationToken::new();
78
79 let grpc_listener = TcpListener::bind(options.grpc_addr)
80 .await
81 .context("failed to bind gRPC listener")?;
82 let http_listener = TcpListener::bind(options.http_addr)
83 .await
84 .context("failed to bind HTTP listener")?;
85 let metrics_listener = TcpListener::bind(options.metrics_addr)
86 .await
87 .context("failed to bind metrics listener")?;
88
89 let addresses = BoundAddresses {
90 grpc: grpc_listener
91 .local_addr()
92 .context("missing gRPC local address")?,
93 http: http_listener
94 .local_addr()
95 .context("missing HTTP local address")?,
96 metrics: metrics_listener
97 .local_addr()
98 .context("missing metrics local address")?,
99 };
100
101 let grpc_task = tokio::spawn(grpc::serve(grpc_listener, state.clone(), shutdown.clone()));
102 let http_task = tokio::spawn(http::serve(http_listener, state.clone(), shutdown.clone()));
103 let metrics_task = tokio::spawn(http::serve_metrics(
104 metrics_listener,
105 state.clone(),
106 shutdown.clone(),
107 ));
108
109 Ok(RunningServers {
110 addresses,
111 shutdown,
112 db: state.db.clone(),
113 grpc_task,
114 http_task,
115 metrics_task,
116 })
117}