Skip to main content

clawdb_server/
state.rs

1use std::{
2    collections::HashMap,
3    num::NonZeroU32,
4    sync::{Arc, Mutex},
5    time::Duration,
6};
7
8use clawdb::{ClawDB, ClawDBSession};
9use governor::{
10    clock::{Clock, DefaultClock},
11    DefaultKeyedRateLimiter, Quota,
12};
13use prometheus_client::{
14    encoding::text::encode,
15    metrics::{
16        counter::Counter,
17        family::Family,
18        gauge::Gauge,
19        histogram::{exponential_buckets, Histogram},
20    },
21    registry::Registry,
22};
23use tokio::sync::Mutex as AsyncMutex;
24use uuid::Uuid;
25
26#[derive(Clone, Debug)]
27pub struct RequestId(pub String);
28
29#[derive(Clone, Debug, Hash, PartialEq, Eq, prometheus_client::encoding::EncodeLabelSet)]
30pub struct HttpLabels {
31    pub method: String,
32    pub path: String,
33    pub status: String,
34}
35
36#[derive(Clone, Debug, Hash, PartialEq, Eq, prometheus_client::encoding::EncodeLabelSet)]
37pub struct HttpPathLabel {
38    pub path: String,
39}
40
41#[derive(Clone, Debug, Hash, PartialEq, Eq, prometheus_client::encoding::EncodeLabelSet)]
42pub struct GrpcLabels {
43    pub method: String,
44    pub status: String,
45}
46
47fn duration_histogram() -> Histogram {
48    Histogram::new(exponential_buckets(0.001, 2.0, 16))
49}
50
51#[derive(Clone)]
52pub struct ServerMetrics {
53    registry: Arc<Mutex<Registry>>,
54    http_requests_total: Family<HttpLabels, Counter>,
55    http_request_duration_seconds: Family<HttpPathLabel, Histogram, fn() -> Histogram>,
56    grpc_requests_total: Family<GrpcLabels, Counter>,
57    active_sessions: Gauge,
58}
59
60impl ServerMetrics {
61    pub fn new() -> Self {
62        let http_requests_total = Family::default();
63        let http_request_duration_seconds =
64            Family::new_with_constructor(duration_histogram as fn() -> Histogram);
65        let grpc_requests_total = Family::default();
66        let active_sessions = Gauge::default();
67
68        let mut registry = Registry::default();
69        registry.register(
70            "clawdb_http_requests_total",
71            "HTTP requests",
72            http_requests_total.clone(),
73        );
74        registry.register(
75            "clawdb_http_request_duration_seconds",
76            "HTTP request duration",
77            http_request_duration_seconds.clone(),
78        );
79        registry.register(
80            "clawdb_grpc_requests_total",
81            "gRPC requests",
82            grpc_requests_total.clone(),
83        );
84        registry.register(
85            "clawdb_active_sessions",
86            "Active sessions",
87            active_sessions.clone(),
88        );
89
90        Self {
91            registry: Arc::new(Mutex::new(registry)),
92            http_requests_total,
93            http_request_duration_seconds,
94            grpc_requests_total,
95            active_sessions,
96        }
97    }
98
99    pub fn observe_http(&self, method: &str, path: &str, status: u16, duration: Duration) {
100        self.http_requests_total
101            .get_or_create(&HttpLabels {
102                method: method.to_string(),
103                path: path.to_string(),
104                status: status.to_string(),
105            })
106            .inc();
107        self.http_request_duration_seconds
108            .get_or_create(&HttpPathLabel {
109                path: path.to_string(),
110            })
111            .observe(duration.as_secs_f64());
112    }
113
114    pub fn observe_grpc(&self, method: &str, status: &str) {
115        self.grpc_requests_total
116            .get_or_create(&GrpcLabels {
117                method: method.to_string(),
118                status: status.to_string(),
119            })
120            .inc();
121    }
122
123    pub fn set_active_sessions(&self, count: u64) {
124        self.active_sessions
125            .set(i64::try_from(count).unwrap_or(i64::MAX));
126    }
127
128    pub fn render(&self, claw_metrics: String) -> String {
129        let mut out = claw_metrics;
130        let mut buffer = String::new();
131        if let Ok(registry) = self.registry.lock() {
132            let _ = encode(&mut buffer, &registry);
133        }
134        out.push_str(&buffer);
135        out
136    }
137}
138
139impl Default for ServerMetrics {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145#[derive(Clone)]
146pub struct PendingTransaction {
147    pub id: Uuid,
148    pub session: ClawDBSession,
149}
150
151pub struct AppState {
152    pub db: Arc<ClawDB>,
153    pub metrics: ServerMetrics,
154    pub transactions: AsyncMutex<HashMap<Uuid, PendingTransaction>>,
155    pub grpc_limiter: DefaultKeyedRateLimiter<String>,
156    pub http_read_limiter: DefaultKeyedRateLimiter<String>,
157    pub http_write_limiter: DefaultKeyedRateLimiter<String>,
158}
159
160impl AppState {
161    pub fn new(db: Arc<ClawDB>) -> Self {
162        let non_zero = |value| NonZeroU32::new(value).unwrap_or(NonZeroU32::MIN);
163
164        Self {
165            db,
166            metrics: ServerMetrics::new(),
167            transactions: AsyncMutex::new(HashMap::new()),
168            grpc_limiter: DefaultKeyedRateLimiter::keyed(Quota::per_minute(non_zero(1000))),
169            http_read_limiter: DefaultKeyedRateLimiter::keyed(Quota::per_minute(non_zero(2000))),
170            http_write_limiter: DefaultKeyedRateLimiter::keyed(Quota::per_minute(non_zero(500))),
171        }
172    }
173
174    pub fn retry_after_seconds(
175        not_until: &governor::NotUntil<<DefaultClock as Clock>::Instant>,
176    ) -> u64 {
177        not_until
178            .wait_time_from(DefaultClock::default().now())
179            .as_secs()
180            .max(1)
181    }
182}