Skip to main content

clawdb_server/
lib.rs

1pub mod grpc;
2pub mod http;
3pub mod state;
4
5use std::{net::SocketAddr, sync::Arc, time::Duration};
6
7use anyhow::{Context, Result};
8use clawdb::{ClawDB, ClawDBConfig};
9use tokio::{net::TcpListener, task::JoinHandle, time::timeout};
10use tokio_util::sync::CancellationToken;
11
12use crate::state::AppState;
13
14pub const VERSION_TEXT: &str = concat!(
15    "clawdb-server ",
16    env!("CARGO_PKG_VERSION"),
17    "\ncomponents: claw-core/0.1.2 claw-vector/0.1.2 claw-branch/0.1.2\n            claw-sync/0.1.2 claw-guard/0.1.2 claw-reflect-client/0.1.2"
18);
19
20#[derive(Clone, Debug)]
21pub struct ServerOptions {
22    pub grpc_addr: SocketAddr,
23    pub http_addr: SocketAddr,
24    pub metrics_addr: SocketAddr,
25}
26
27#[derive(Clone, Debug)]
28pub struct BoundAddresses {
29    pub grpc: SocketAddr,
30    pub http: SocketAddr,
31    pub metrics: SocketAddr,
32}
33
34pub struct RunningServers {
35    pub addresses: BoundAddresses,
36    pub shutdown: CancellationToken,
37    pub db: Arc<ClawDB>,
38    pub grpc_task: JoinHandle<Result<()>>,
39    pub http_task: JoinHandle<Result<()>>,
40    pub metrics_task: JoinHandle<Result<()>>,
41}
42
43impl RunningServers {
44    pub async fn shutdown(self, grace: Duration) -> Result<()> {
45        self.shutdown.cancel();
46
47        let joined = timeout(grace, async {
48            let grpc = self.grpc_task.await.context("gRPC task join failed")?;
49            let http = self.http_task.await.context("HTTP task join failed")?;
50            let metrics = self
51                .metrics_task
52                .await
53                .context("metrics task join failed")?;
54            grpc?;
55            http?;
56            metrics?;
57            Result::<()>::Ok(())
58        })
59        .await;
60
61        self.db.close().await.context("failed to close clawdb")?;
62        joined.context("timed out waiting for server tasks")??;
63        Ok(())
64    }
65}
66
67pub async fn build_state(config: ClawDBConfig) -> Result<Arc<AppState>> {
68    let db = Arc::new(
69        ClawDB::new(config)
70            .await
71            .context("failed to initialize ClawDB")?,
72    );
73    Ok(Arc::new(AppState::new(db)))
74}
75
76pub async fn spawn_servers(state: Arc<AppState>, options: ServerOptions) -> Result<RunningServers> {
77    let shutdown = CancellationToken::new();
78
79    let grpc_listener = TcpListener::bind(options.grpc_addr)
80        .await
81        .context("failed to bind gRPC listener")?;
82    let http_listener = TcpListener::bind(options.http_addr)
83        .await
84        .context("failed to bind HTTP listener")?;
85    let metrics_listener = TcpListener::bind(options.metrics_addr)
86        .await
87        .context("failed to bind metrics listener")?;
88
89    let addresses = BoundAddresses {
90        grpc: grpc_listener
91            .local_addr()
92            .context("missing gRPC local address")?,
93        http: http_listener
94            .local_addr()
95            .context("missing HTTP local address")?,
96        metrics: metrics_listener
97            .local_addr()
98            .context("missing metrics local address")?,
99    };
100
101    let grpc_task = tokio::spawn(grpc::serve(grpc_listener, state.clone(), shutdown.clone()));
102    let http_task = tokio::spawn(http::serve(http_listener, state.clone(), shutdown.clone()));
103    let metrics_task = tokio::spawn(http::serve_metrics(
104        metrics_listener,
105        state.clone(),
106        shutdown.clone(),
107    ));
108
109    Ok(RunningServers {
110        addresses,
111        shutdown,
112        db: state.db.clone(),
113        grpc_task,
114        http_task,
115        metrics_task,
116    })
117}