clawdb-server 0.1.2

gRPC and HTTP server for the ClawDB cognitive database.
Documentation
pub mod grpc;
pub mod http;
pub mod state;

use std::{net::SocketAddr, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use clawdb::{ClawDB, ClawDBConfig};
use tokio::{net::TcpListener, task::JoinHandle, time::timeout};
use tokio_util::sync::CancellationToken;

use crate::state::AppState;

pub const VERSION_TEXT: &str = concat!(
    "clawdb-server ",
    env!("CARGO_PKG_VERSION"),
    "\ncomponents: claw-core/0.1.2 claw-vector/0.1.2 claw-branch/0.1.2\n            claw-sync/0.1.2 claw-guard/0.1.2 claw-reflect-client/0.1.2"
);

#[derive(Clone, Debug)]
pub struct ServerOptions {
    pub grpc_addr: SocketAddr,
    pub http_addr: SocketAddr,
    pub metrics_addr: SocketAddr,
}

#[derive(Clone, Debug)]
pub struct BoundAddresses {
    pub grpc: SocketAddr,
    pub http: SocketAddr,
    pub metrics: SocketAddr,
}

pub struct RunningServers {
    pub addresses: BoundAddresses,
    pub shutdown: CancellationToken,
    pub db: Arc<ClawDB>,
    pub grpc_task: JoinHandle<Result<()>>,
    pub http_task: JoinHandle<Result<()>>,
    pub metrics_task: JoinHandle<Result<()>>,
}

impl RunningServers {
    pub async fn shutdown(self, grace: Duration) -> Result<()> {
        self.shutdown.cancel();

        let joined = timeout(grace, async {
            let grpc = self.grpc_task.await.context("gRPC task join failed")?;
            let http = self.http_task.await.context("HTTP task join failed")?;
            let metrics = self
                .metrics_task
                .await
                .context("metrics task join failed")?;
            grpc?;
            http?;
            metrics?;
            Result::<()>::Ok(())
        })
        .await;

        self.db.close().await.context("failed to close clawdb")?;
        joined.context("timed out waiting for server tasks")??;
        Ok(())
    }
}

pub async fn build_state(config: ClawDBConfig) -> Result<Arc<AppState>> {
    let db = Arc::new(
        ClawDB::new(config)
            .await
            .context("failed to initialize ClawDB")?,
    );
    Ok(Arc::new(AppState::new(db)))
}

pub async fn spawn_servers(state: Arc<AppState>, options: ServerOptions) -> Result<RunningServers> {
    let shutdown = CancellationToken::new();

    let grpc_listener = TcpListener::bind(options.grpc_addr)
        .await
        .context("failed to bind gRPC listener")?;
    let http_listener = TcpListener::bind(options.http_addr)
        .await
        .context("failed to bind HTTP listener")?;
    let metrics_listener = TcpListener::bind(options.metrics_addr)
        .await
        .context("failed to bind metrics listener")?;

    let addresses = BoundAddresses {
        grpc: grpc_listener
            .local_addr()
            .context("missing gRPC local address")?,
        http: http_listener
            .local_addr()
            .context("missing HTTP local address")?,
        metrics: metrics_listener
            .local_addr()
            .context("missing metrics local address")?,
    };

    let grpc_task = tokio::spawn(grpc::serve(grpc_listener, state.clone(), shutdown.clone()));
    let http_task = tokio::spawn(http::serve(http_listener, state.clone(), shutdown.clone()));
    let metrics_task = tokio::spawn(http::serve_metrics(
        metrics_listener,
        state.clone(),
        shutdown.clone(),
    ));

    Ok(RunningServers {
        addresses,
        shutdown,
        db: state.db.clone(),
        grpc_task,
        http_task,
        metrics_task,
    })
}