1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use tokio::sync::mpsc;
6use tokio::time::{interval, Instant, MissedTickBehavior};
7use tracing::{debug, error, warn};
8use uuid::Uuid;
9
10const MAX_IN_MEMORY_RETRIES: u32 = 3;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum WebSocketUsageEvent {
15 ConnectionEstablished {
16 client_id: String,
17 remote_addr: String,
18 deployment_id: Option<String>,
19 metering_key: Option<String>,
20 subject: Option<String>,
21 key_class: Option<String>,
22 },
23 ConnectionClosed {
24 client_id: String,
25 deployment_id: Option<String>,
26 metering_key: Option<String>,
27 subject: Option<String>,
28 duration_secs: Option<f64>,
29 subscription_count: u32,
30 },
31 SubscriptionCreated {
32 client_id: String,
33 deployment_id: Option<String>,
34 metering_key: Option<String>,
35 subject: Option<String>,
36 view_id: String,
37 },
38 SubscriptionRemoved {
39 client_id: String,
40 deployment_id: Option<String>,
41 metering_key: Option<String>,
42 subject: Option<String>,
43 view_id: String,
44 },
45 SnapshotSent {
46 client_id: String,
47 deployment_id: Option<String>,
48 metering_key: Option<String>,
49 subject: Option<String>,
50 view_id: String,
51 rows: u32,
52 messages: u32,
53 bytes: u64,
54 },
55 UpdateSent {
56 client_id: String,
57 deployment_id: Option<String>,
58 metering_key: Option<String>,
59 subject: Option<String>,
60 view_id: String,
61 messages: u32,
62 bytes: u64,
63 },
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct WebSocketUsageEnvelope {
68 pub event_id: String,
69 pub occurred_at_ms: u64,
70 pub event: WebSocketUsageEvent,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct WebSocketUsageBatch {
75 pub events: Vec<WebSocketUsageEnvelope>,
76}
77
78#[async_trait]
79pub trait WebSocketUsageEmitter: Send + Sync {
80 async fn emit(&self, event: WebSocketUsageEvent);
81}
82
83#[derive(Clone)]
84pub struct ChannelUsageEmitter {
85 sender: mpsc::UnboundedSender<WebSocketUsageEvent>,
86}
87
88impl ChannelUsageEmitter {
89 pub fn new(sender: mpsc::UnboundedSender<WebSocketUsageEvent>) -> Self {
90 Self { sender }
91 }
92}
93
94#[async_trait]
95impl WebSocketUsageEmitter for ChannelUsageEmitter {
96 async fn emit(&self, event: WebSocketUsageEvent) {
97 let _ = self.sender.send(event);
98 }
99}
100
101pub struct HttpUsageEmitter {
102 sender: mpsc::UnboundedSender<WebSocketUsageEvent>,
103}
104
105#[derive(Debug, Clone)]
106struct RetryState {
107 batch: WebSocketUsageBatch,
108 attempts: u32,
109 next_retry_at: Instant,
110}
111
112impl HttpUsageEmitter {
113 pub fn new(endpoint: String, auth_token: Option<String>) -> Self {
114 Self::with_config(endpoint, auth_token, 50, Duration::from_secs(2))
115 }
116
117 pub fn with_spool_dir(
118 endpoint: String,
119 auth_token: Option<String>,
120 spool_dir: impl Into<PathBuf>,
121 ) -> Self {
122 Self::with_full_config(
123 endpoint,
124 auth_token,
125 50,
126 Duration::from_secs(2),
127 Some(spool_dir.into()),
128 )
129 }
130
131 pub fn with_config(
132 endpoint: String,
133 auth_token: Option<String>,
134 batch_size: usize,
135 flush_interval: Duration,
136 ) -> Self {
137 Self::with_full_config(endpoint, auth_token, batch_size, flush_interval, None)
138 }
139
140 fn with_full_config(
141 endpoint: String,
142 auth_token: Option<String>,
143 batch_size: usize,
144 flush_interval: Duration,
145 spool_dir: Option<PathBuf>,
146 ) -> Self {
147 let (sender, mut receiver) = mpsc::unbounded_channel::<WebSocketUsageEvent>();
148 let client = reqwest::Client::new();
149
150 tokio::spawn(async move {
151 let mut ticker = interval(flush_interval);
152 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
153 let mut pending: Vec<WebSocketUsageEnvelope> = Vec::new();
154 let mut retry_state: Option<RetryState> = None;
155
156 if let Some(dir) = spool_dir.as_ref() {
157 if let Err(error) = ensure_spool_dir(dir) {
158 warn!(error = %error, path = %dir.display(), "failed to initialize websocket usage spool directory");
159 }
160 }
161
162 loop {
163 tokio::select! {
164 maybe_event = receiver.recv() => {
165 match maybe_event {
166 Some(event) => {
167 pending.push(WebSocketUsageEnvelope {
168 event_id: Uuid::new_v4().to_string(),
169 occurred_at_ms: current_time_ms(),
170 event,
171 });
172
173 if retry_state.is_none() && pending.len() >= batch_size {
174 flush_pending_batch(
175 &client,
176 &endpoint,
177 auth_token.as_deref(),
178 &mut pending,
179 &mut retry_state,
180 spool_dir.as_deref(),
181 ).await;
182 }
183 }
184 None => {
185 if retry_state.is_none() && !pending.is_empty() {
186 flush_pending_batch(
187 &client,
188 &endpoint,
189 auth_token.as_deref(),
190 &mut pending,
191 &mut retry_state,
192 spool_dir.as_deref(),
193 ).await;
194 }
195
196 if let Some(state) = retry_state.take() {
197 if let Err(retry_state_failed) = flush_existing_batch(
198 &client,
199 &endpoint,
200 auth_token.as_deref(),
201 state,
202 ).await {
203 if let Some(dir) = spool_dir.as_deref() {
204 if let Err(error) = spool_retry_state(dir, &retry_state_failed) {
205 warn!(error = %error, count = retry_state_failed.batch.events.len(), "failed to spool websocket usage batch during shutdown");
206 }
207 } else {
208 warn!(
209 count = retry_state_failed.batch.events.len(),
210 attempts = retry_state_failed.attempts,
211 "dropping websocket usage batch during shutdown after failed retry"
212 );
213 }
214 }
215 }
216
217 if !pending.is_empty() {
218 if let Some(dir) = spool_dir.as_deref() {
219 let batch = WebSocketUsageBatch { events: std::mem::take(&mut pending) };
220 if let Err(error) = spool_batch(dir, &batch) {
221 warn!(error = %error, count = batch.events.len(), "failed to spool pending websocket usage batch during shutdown");
222 }
223 } else {
224 warn!(count = pending.len(), "dropping pending websocket usage events during shutdown without spool directory");
225 }
226 }
227 break;
228 }
229 }
230 }
231 _ = ticker.tick() => {
232 if let Some(dir) = spool_dir.as_deref() {
233 if retry_state.is_none() {
234 if let Err(error) = flush_one_spooled_batch(&client, &endpoint, auth_token.as_deref(), dir).await {
235 warn!(error = %error, path = %dir.display(), "failed to process spooled websocket usage batch");
236 }
237 }
238 }
239
240 if let Some(state) = retry_state.take() {
241 if Instant::now() >= state.next_retry_at {
242 match flush_existing_batch(
243 &client,
244 &endpoint,
245 auth_token.as_deref(),
246 state,
247 ).await {
248 Ok(()) => {
249 if !pending.is_empty() {
250 flush_pending_batch(
251 &client,
252 &endpoint,
253 auth_token.as_deref(),
254 &mut pending,
255 &mut retry_state,
256 spool_dir.as_deref(),
257 ).await;
258 }
259 }
260 Err(state) => {
261 if state.attempts >= MAX_IN_MEMORY_RETRIES {
262 if let Some(dir) = spool_dir.as_deref() {
263 if let Err(error) = spool_retry_state(dir, &state) {
264 warn!(error = %error, count = state.batch.events.len(), "failed to spool websocket usage batch after retries");
265 retry_state = Some(state);
266 }
267 } else {
268 retry_state = Some(state);
269 }
270 } else {
271 retry_state = Some(state)
272 }
273 }
274 }
275 } else {
276 retry_state = Some(state);
277 }
278 } else if !pending.is_empty() {
279 flush_pending_batch(
280 &client,
281 &endpoint,
282 auth_token.as_deref(),
283 &mut pending,
284 &mut retry_state,
285 spool_dir.as_deref(),
286 ).await;
287 }
288 }
289 }
290 }
291 });
292
293 Self { sender }
294 }
295}
296
297#[async_trait]
298impl WebSocketUsageEmitter for HttpUsageEmitter {
299 async fn emit(&self, event: WebSocketUsageEvent) {
300 if let Err(error) = self.sender.send(event) {
301 warn!(error = %error, "failed to queue websocket usage event");
302 }
303 }
304}
305
306fn current_time_ms() -> u64 {
307 std::time::SystemTime::now()
308 .duration_since(std::time::UNIX_EPOCH)
309 .unwrap_or_default()
310 .as_millis() as u64
311}
312
313async fn flush_batch(
314 client: &reqwest::Client,
315 endpoint: &str,
316 auth_token: Option<&str>,
317 batch: &WebSocketUsageBatch,
318) -> bool {
319 if batch.events.is_empty() {
320 return true;
321 }
322
323 let mut request = client.post(endpoint).json(batch);
324 if let Some(token) = auth_token {
325 request = request.header("Authorization", format!("Bearer {}", token));
326 }
327
328 match request.send().await {
329 Ok(response) if response.status().is_success() => {
330 debug!(count = batch.events.len(), "flushed websocket usage batch");
331 true
332 }
333 Ok(response) => {
334 error!(status = %response.status(), count = batch.events.len(), "failed to ingest websocket usage batch");
335 false
336 }
337 Err(error) => {
338 error!(error = %error, count = batch.events.len(), "failed to post websocket usage batch");
339 false
340 }
341 }
342}
343
344async fn flush_pending_batch(
345 client: &reqwest::Client,
346 endpoint: &str,
347 auth_token: Option<&str>,
348 pending: &mut Vec<WebSocketUsageEnvelope>,
349 retry_state: &mut Option<RetryState>,
350 spool_dir: Option<&Path>,
351) {
352 let batch = WebSocketUsageBatch {
353 events: std::mem::take(pending),
354 };
355
356 if !flush_batch(client, endpoint, auth_token, &batch).await {
357 let state = RetryState {
358 batch,
359 attempts: 1,
360 next_retry_at: Instant::now() + retry_delay(1),
361 };
362
363 if let Some(dir) = spool_dir.filter(|_| MAX_IN_MEMORY_RETRIES <= 1) {
364 if let Err(error) = spool_retry_state(dir, &state) {
365 warn!(error = %error, count = state.batch.events.len(), "failed to spool websocket usage batch after first failure");
366 *retry_state = Some(state);
367 }
368 } else {
369 *retry_state = Some(state);
370 }
371 }
372}
373
374async fn flush_existing_batch(
375 client: &reqwest::Client,
376 endpoint: &str,
377 auth_token: Option<&str>,
378 mut state: RetryState,
379) -> Result<(), RetryState> {
380 if flush_batch(client, endpoint, auth_token, &state.batch).await {
381 Ok(())
382 } else {
383 state.attempts += 1;
384 state.next_retry_at = Instant::now() + retry_delay(state.attempts);
385 Err(state)
386 }
387}
388
389fn retry_delay(attempt: u32) -> Duration {
390 let capped_attempt = attempt.min(6);
391 Duration::from_secs(1_u64 << capped_attempt)
392}
393
394fn ensure_spool_dir(path: &Path) -> std::io::Result<()> {
395 std::fs::create_dir_all(path)
396}
397
398fn spool_retry_state(path: &Path, state: &RetryState) -> std::io::Result<PathBuf> {
399 spool_batch(path, &state.batch)
400}
401
402fn spool_batch(path: &Path, batch: &WebSocketUsageBatch) -> std::io::Result<PathBuf> {
403 ensure_spool_dir(path)?;
404
405 let file_name = format!(
406 "ws-usage-{}-{}.json",
407 current_time_ms(),
408 Uuid::new_v4().simple()
409 );
410 let final_path = path.join(file_name);
411 let temp_path = final_path.with_extension("tmp");
412 let data = serde_json::to_vec(batch).map_err(std::io::Error::other)?;
413 std::fs::write(&temp_path, data)?;
414 std::fs::rename(&temp_path, &final_path)?;
415 Ok(final_path)
416}
417
418fn load_batch_from_file(path: &Path) -> std::io::Result<WebSocketUsageBatch> {
419 let data = std::fs::read(path)?;
420 serde_json::from_slice(&data).map_err(std::io::Error::other)
421}
422
423fn oldest_spooled_batch(path: &Path) -> std::io::Result<Option<PathBuf>> {
424 if !path.exists() {
425 return Ok(None);
426 }
427
428 let mut entries: Vec<PathBuf> = std::fs::read_dir(path)?
429 .filter_map(|entry| entry.ok().map(|entry| entry.path()))
430 .filter(|entry| entry.extension().and_then(|ext| ext.to_str()) == Some("json"))
431 .collect();
432 entries.sort();
433 Ok(entries.into_iter().next())
434}
435
436async fn flush_one_spooled_batch(
437 client: &reqwest::Client,
438 endpoint: &str,
439 auth_token: Option<&str>,
440 spool_dir: &Path,
441) -> std::io::Result<()> {
442 let Some(path) = oldest_spooled_batch(spool_dir)? else {
443 return Ok(());
444 };
445
446 let batch = load_batch_from_file(&path)?;
447 if flush_batch(client, endpoint, auth_token, &batch).await {
448 std::fs::remove_file(path)?;
449 }
450
451 Ok(())
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use std::fs;
458
459 fn temp_spool_dir() -> PathBuf {
460 let dir = std::env::temp_dir().join(format!("hyperstack-usage-test-{}", Uuid::new_v4()));
461 fs::create_dir_all(&dir).expect("temp dir should be created");
462 dir
463 }
464
465 #[tokio::test]
466 async fn channel_usage_emitter_forwards_events() {
467 let (tx, mut rx) = mpsc::unbounded_channel();
468 let emitter = ChannelUsageEmitter::new(tx);
469
470 emitter
471 .emit(WebSocketUsageEvent::SubscriptionCreated {
472 client_id: "client-1".to_string(),
473 deployment_id: Some("deployment-1".to_string()),
474 metering_key: Some("meter-1".to_string()),
475 subject: Some("subject-1".to_string()),
476 view_id: "OreRound/latest".to_string(),
477 })
478 .await;
479
480 let event = rx.recv().await.expect("event should be forwarded");
481 match event {
482 WebSocketUsageEvent::SubscriptionCreated { view_id, .. } => {
483 assert_eq!(view_id, "OreRound/latest");
484 }
485 other => panic!("unexpected event: {other:?}"),
486 }
487 }
488
489 #[test]
490 fn retry_delay_grows_and_caps() {
491 assert_eq!(retry_delay(1), Duration::from_secs(2));
492 assert_eq!(retry_delay(2), Duration::from_secs(4));
493 assert_eq!(retry_delay(6), Duration::from_secs(64));
494 assert_eq!(retry_delay(9), Duration::from_secs(64));
495 }
496
497 #[test]
498 fn spooled_batches_round_trip() {
499 let dir = temp_spool_dir();
500 let batch = WebSocketUsageBatch {
501 events: vec![WebSocketUsageEnvelope {
502 event_id: "evt_1".to_string(),
503 occurred_at_ms: 123,
504 event: WebSocketUsageEvent::UpdateSent {
505 client_id: "client-1".to_string(),
506 deployment_id: Some("1".to_string()),
507 metering_key: Some("api_key:1".to_string()),
508 subject: Some("user:1".to_string()),
509 view_id: "OreRound/latest".to_string(),
510 messages: 1,
511 bytes: 42,
512 },
513 }],
514 };
515
516 let path = spool_batch(&dir, &batch).expect("batch should spool");
517 let loaded = load_batch_from_file(&path).expect("batch should load");
518 assert_eq!(loaded.events.len(), 1);
519
520 fs::remove_dir_all(dir).expect("temp dir should be removed");
521 }
522
523 #[test]
524 fn oldest_spooled_batch_prefers_lexicographically_oldest_file() {
525 let dir = temp_spool_dir();
526 fs::write(dir.join("ws-usage-100-a.json"), b"{\"events\":[]}").expect("first batch");
527 fs::write(dir.join("ws-usage-200-b.json"), b"{\"events\":[]}").expect("second batch");
528
529 let oldest = oldest_spooled_batch(&dir)
530 .expect("listing should succeed")
531 .expect("batch should exist");
532 assert!(oldest.ends_with("ws-usage-100-a.json"));
533
534 fs::remove_dir_all(dir).expect("temp dir should be removed");
535 }
536}