1use std::{
2 collections::HashMap,
3 num::NonZeroU32,
4 sync::{Arc, Mutex},
5 time::Duration,
6};
7
8use clawdb::{ClawDB, ClawDBSession};
9use governor::{
10 clock::{Clock, DefaultClock},
11 DefaultKeyedRateLimiter, Quota,
12};
13use prometheus_client::{
14 encoding::text::encode,
15 metrics::{
16 counter::Counter,
17 family::Family,
18 gauge::Gauge,
19 histogram::{exponential_buckets, Histogram},
20 },
21 registry::Registry,
22};
23use tokio::sync::Mutex as AsyncMutex;
24use uuid::Uuid;
25
26#[derive(Clone, Debug)]
27pub struct RequestId(pub String);
28
29#[derive(Clone, Debug, Hash, PartialEq, Eq, prometheus_client::encoding::EncodeLabelSet)]
30pub struct HttpLabels {
31 pub method: String,
32 pub path: String,
33 pub status: String,
34}
35
36#[derive(Clone, Debug, Hash, PartialEq, Eq, prometheus_client::encoding::EncodeLabelSet)]
37pub struct HttpPathLabel {
38 pub path: String,
39}
40
41#[derive(Clone, Debug, Hash, PartialEq, Eq, prometheus_client::encoding::EncodeLabelSet)]
42pub struct GrpcLabels {
43 pub method: String,
44 pub status: String,
45}
46
47fn duration_histogram() -> Histogram {
48 Histogram::new(exponential_buckets(0.001, 2.0, 16))
49}
50
51#[derive(Clone)]
52pub struct ServerMetrics {
53 registry: Arc<Mutex<Registry>>,
54 http_requests_total: Family<HttpLabels, Counter>,
55 http_request_duration_seconds: Family<HttpPathLabel, Histogram, fn() -> Histogram>,
56 grpc_requests_total: Family<GrpcLabels, Counter>,
57 active_sessions: Gauge,
58}
59
60impl ServerMetrics {
61 pub fn new() -> Self {
62 let http_requests_total = Family::default();
63 let http_request_duration_seconds =
64 Family::new_with_constructor(duration_histogram as fn() -> Histogram);
65 let grpc_requests_total = Family::default();
66 let active_sessions = Gauge::default();
67
68 let mut registry = Registry::default();
69 registry.register(
70 "clawdb_http_requests_total",
71 "HTTP requests",
72 http_requests_total.clone(),
73 );
74 registry.register(
75 "clawdb_http_request_duration_seconds",
76 "HTTP request duration",
77 http_request_duration_seconds.clone(),
78 );
79 registry.register(
80 "clawdb_grpc_requests_total",
81 "gRPC requests",
82 grpc_requests_total.clone(),
83 );
84 registry.register(
85 "clawdb_active_sessions",
86 "Active sessions",
87 active_sessions.clone(),
88 );
89
90 Self {
91 registry: Arc::new(Mutex::new(registry)),
92 http_requests_total,
93 http_request_duration_seconds,
94 grpc_requests_total,
95 active_sessions,
96 }
97 }
98
99 pub fn observe_http(&self, method: &str, path: &str, status: u16, duration: Duration) {
100 self.http_requests_total
101 .get_or_create(&HttpLabels {
102 method: method.to_string(),
103 path: path.to_string(),
104 status: status.to_string(),
105 })
106 .inc();
107 self.http_request_duration_seconds
108 .get_or_create(&HttpPathLabel {
109 path: path.to_string(),
110 })
111 .observe(duration.as_secs_f64());
112 }
113
114 pub fn observe_grpc(&self, method: &str, status: &str) {
115 self.grpc_requests_total
116 .get_or_create(&GrpcLabels {
117 method: method.to_string(),
118 status: status.to_string(),
119 })
120 .inc();
121 }
122
123 pub fn set_active_sessions(&self, count: u64) {
124 self.active_sessions
125 .set(i64::try_from(count).unwrap_or(i64::MAX));
126 }
127
128 pub fn render(&self, claw_metrics: String) -> String {
129 let mut out = claw_metrics;
130 let mut buffer = String::new();
131 if let Ok(registry) = self.registry.lock() {
132 let _ = encode(&mut buffer, ®istry);
133 }
134 out.push_str(&buffer);
135 out
136 }
137}
138
139impl Default for ServerMetrics {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[derive(Clone)]
146pub struct PendingTransaction {
147 pub id: Uuid,
148 pub session: ClawDBSession,
149}
150
151pub struct AppState {
152 pub db: Arc<ClawDB>,
153 pub metrics: ServerMetrics,
154 pub transactions: AsyncMutex<HashMap<Uuid, PendingTransaction>>,
155 pub grpc_limiter: DefaultKeyedRateLimiter<String>,
156 pub http_read_limiter: DefaultKeyedRateLimiter<String>,
157 pub http_write_limiter: DefaultKeyedRateLimiter<String>,
158}
159
160impl AppState {
161 pub fn new(db: Arc<ClawDB>) -> Self {
162 let non_zero = |value| NonZeroU32::new(value).unwrap_or(NonZeroU32::MIN);
163
164 Self {
165 db,
166 metrics: ServerMetrics::new(),
167 transactions: AsyncMutex::new(HashMap::new()),
168 grpc_limiter: DefaultKeyedRateLimiter::keyed(Quota::per_minute(non_zero(1000))),
169 http_read_limiter: DefaultKeyedRateLimiter::keyed(Quota::per_minute(non_zero(2000))),
170 http_write_limiter: DefaultKeyedRateLimiter::keyed(Quota::per_minute(non_zero(500))),
171 }
172 }
173
174 pub fn retry_after_seconds(
175 not_until: &governor::NotUntil<<DefaultClock as Clock>::Instant>,
176 ) -> u64 {
177 not_until
178 .wait_time_from(DefaultClock::default().now())
179 .as_secs()
180 .max(1)
181 }
182}