Skip to main content

hyperstack_server/websocket/
usage.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use tokio::sync::mpsc;
6use tokio::time::{interval, Instant, MissedTickBehavior};
7use tracing::{debug, error, warn};
8use uuid::Uuid;
9
10const MAX_IN_MEMORY_RETRIES: u32 = 3;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type", rename_all = "snake_case")]
14pub enum WebSocketUsageEvent {
15    ConnectionEstablished {
16        client_id: String,
17        remote_addr: String,
18        deployment_id: Option<String>,
19        metering_key: Option<String>,
20        subject: Option<String>,
21        key_class: Option<String>,
22    },
23    ConnectionClosed {
24        client_id: String,
25        deployment_id: Option<String>,
26        metering_key: Option<String>,
27        subject: Option<String>,
28        duration_secs: Option<f64>,
29        subscription_count: u32,
30    },
31    SubscriptionCreated {
32        client_id: String,
33        deployment_id: Option<String>,
34        metering_key: Option<String>,
35        subject: Option<String>,
36        view_id: String,
37    },
38    SubscriptionRemoved {
39        client_id: String,
40        deployment_id: Option<String>,
41        metering_key: Option<String>,
42        subject: Option<String>,
43        view_id: String,
44    },
45    SnapshotSent {
46        client_id: String,
47        deployment_id: Option<String>,
48        metering_key: Option<String>,
49        subject: Option<String>,
50        view_id: String,
51        rows: u32,
52        messages: u32,
53        bytes: u64,
54    },
55    UpdateSent {
56        client_id: String,
57        deployment_id: Option<String>,
58        metering_key: Option<String>,
59        subject: Option<String>,
60        view_id: String,
61        messages: u32,
62        bytes: u64,
63    },
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct WebSocketUsageEnvelope {
68    pub event_id: String,
69    pub occurred_at_ms: u64,
70    pub event: WebSocketUsageEvent,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct WebSocketUsageBatch {
75    pub events: Vec<WebSocketUsageEnvelope>,
76}
77
78#[async_trait]
79pub trait WebSocketUsageEmitter: Send + Sync {
80    async fn emit(&self, event: WebSocketUsageEvent);
81}
82
83#[derive(Clone)]
84pub struct ChannelUsageEmitter {
85    sender: mpsc::UnboundedSender<WebSocketUsageEvent>,
86}
87
88impl ChannelUsageEmitter {
89    pub fn new(sender: mpsc::UnboundedSender<WebSocketUsageEvent>) -> Self {
90        Self { sender }
91    }
92}
93
94#[async_trait]
95impl WebSocketUsageEmitter for ChannelUsageEmitter {
96    async fn emit(&self, event: WebSocketUsageEvent) {
97        let _ = self.sender.send(event);
98    }
99}
100
101pub struct HttpUsageEmitter {
102    sender: mpsc::UnboundedSender<WebSocketUsageEvent>,
103}
104
105#[derive(Debug, Clone)]
106struct RetryState {
107    batch: WebSocketUsageBatch,
108    attempts: u32,
109    next_retry_at: Instant,
110}
111
112impl HttpUsageEmitter {
113    pub fn new(endpoint: String, auth_token: Option<String>) -> Self {
114        Self::with_config(endpoint, auth_token, 50, Duration::from_secs(2))
115    }
116
117    pub fn with_spool_dir(
118        endpoint: String,
119        auth_token: Option<String>,
120        spool_dir: impl Into<PathBuf>,
121    ) -> Self {
122        Self::with_full_config(
123            endpoint,
124            auth_token,
125            50,
126            Duration::from_secs(2),
127            Some(spool_dir.into()),
128        )
129    }
130
131    pub fn with_config(
132        endpoint: String,
133        auth_token: Option<String>,
134        batch_size: usize,
135        flush_interval: Duration,
136    ) -> Self {
137        Self::with_full_config(endpoint, auth_token, batch_size, flush_interval, None)
138    }
139
140    fn with_full_config(
141        endpoint: String,
142        auth_token: Option<String>,
143        batch_size: usize,
144        flush_interval: Duration,
145        spool_dir: Option<PathBuf>,
146    ) -> Self {
147        let (sender, mut receiver) = mpsc::unbounded_channel::<WebSocketUsageEvent>();
148        let client = reqwest::Client::new();
149
150        tokio::spawn(async move {
151            let mut ticker = interval(flush_interval);
152            ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
153            let mut pending: Vec<WebSocketUsageEnvelope> = Vec::new();
154            let mut retry_state: Option<RetryState> = None;
155
156            if let Some(dir) = spool_dir.as_ref() {
157                if let Err(error) = ensure_spool_dir(dir) {
158                    warn!(error = %error, path = %dir.display(), "failed to initialize websocket usage spool directory");
159                }
160            }
161
162            loop {
163                tokio::select! {
164                    maybe_event = receiver.recv() => {
165                        match maybe_event {
166                            Some(event) => {
167                                pending.push(WebSocketUsageEnvelope {
168                                    event_id: Uuid::new_v4().to_string(),
169                                    occurred_at_ms: current_time_ms(),
170                                    event,
171                                });
172
173                                if retry_state.is_none() && pending.len() >= batch_size {
174                                    flush_pending_batch(
175                                        &client,
176                                        &endpoint,
177                                        auth_token.as_deref(),
178                                        &mut pending,
179                                        &mut retry_state,
180                                        spool_dir.as_deref(),
181                                    ).await;
182                                }
183                            }
184                            None => {
185                                if retry_state.is_none() && !pending.is_empty() {
186                                    flush_pending_batch(
187                                        &client,
188                                        &endpoint,
189                                        auth_token.as_deref(),
190                                        &mut pending,
191                                        &mut retry_state,
192                                        spool_dir.as_deref(),
193                                    ).await;
194                                }
195
196                                if let Some(state) = retry_state.take() {
197                                    if let Err(retry_state_failed) = flush_existing_batch(
198                                        &client,
199                                        &endpoint,
200                                        auth_token.as_deref(),
201                                        state,
202                                    ).await {
203                                        if let Some(dir) = spool_dir.as_deref() {
204                                            if let Err(error) = spool_retry_state(dir, &retry_state_failed) {
205                                                warn!(error = %error, count = retry_state_failed.batch.events.len(), "failed to spool websocket usage batch during shutdown");
206                                            }
207                                        } else {
208                                            warn!(
209                                                count = retry_state_failed.batch.events.len(),
210                                                attempts = retry_state_failed.attempts,
211                                                "dropping websocket usage batch during shutdown after failed retry"
212                                            );
213                                        }
214                                    }
215                                }
216
217                                if !pending.is_empty() {
218                                    if let Some(dir) = spool_dir.as_deref() {
219                                        let batch = WebSocketUsageBatch { events: std::mem::take(&mut pending) };
220                                        if let Err(error) = spool_batch(dir, &batch) {
221                                            warn!(error = %error, count = batch.events.len(), "failed to spool pending websocket usage batch during shutdown");
222                                        }
223                                    } else {
224                                        warn!(count = pending.len(), "dropping pending websocket usage events during shutdown without spool directory");
225                                    }
226                                }
227                                break;
228                            }
229                        }
230                    }
231                    _ = ticker.tick() => {
232                        if let Some(dir) = spool_dir.as_deref() {
233                            if retry_state.is_none() {
234                                if let Err(error) = flush_one_spooled_batch(&client, &endpoint, auth_token.as_deref(), dir).await {
235                                    warn!(error = %error, path = %dir.display(), "failed to process spooled websocket usage batch");
236                                }
237                            }
238                        }
239
240                        if let Some(state) = retry_state.take() {
241                            if Instant::now() >= state.next_retry_at {
242                                match flush_existing_batch(
243                                    &client,
244                                    &endpoint,
245                                    auth_token.as_deref(),
246                                    state,
247                                ).await {
248                                    Ok(()) => {
249                                        if !pending.is_empty() {
250                                            flush_pending_batch(
251                                                &client,
252                                                &endpoint,
253                                                auth_token.as_deref(),
254                                                &mut pending,
255                                                &mut retry_state,
256                                                spool_dir.as_deref(),
257                                            ).await;
258                                        }
259                                    }
260                                    Err(state) => {
261                                        if state.attempts >= MAX_IN_MEMORY_RETRIES {
262                                            if let Some(dir) = spool_dir.as_deref() {
263                                                if let Err(error) = spool_retry_state(dir, &state) {
264                                                    warn!(error = %error, count = state.batch.events.len(), "failed to spool websocket usage batch after retries");
265                                                    retry_state = Some(state);
266                                                }
267                                            } else {
268                                                retry_state = Some(state);
269                                            }
270                                        } else {
271                                            retry_state = Some(state)
272                                        }
273                                    }
274                                }
275                            } else {
276                                retry_state = Some(state);
277                            }
278                        } else if !pending.is_empty() {
279                            flush_pending_batch(
280                                &client,
281                                &endpoint,
282                                auth_token.as_deref(),
283                                &mut pending,
284                                &mut retry_state,
285                                spool_dir.as_deref(),
286                            ).await;
287                        }
288                    }
289                }
290            }
291        });
292
293        Self { sender }
294    }
295}
296
297#[async_trait]
298impl WebSocketUsageEmitter for HttpUsageEmitter {
299    async fn emit(&self, event: WebSocketUsageEvent) {
300        if let Err(error) = self.sender.send(event) {
301            warn!(error = %error, "failed to queue websocket usage event");
302        }
303    }
304}
305
306fn current_time_ms() -> u64 {
307    std::time::SystemTime::now()
308        .duration_since(std::time::UNIX_EPOCH)
309        .unwrap_or_default()
310        .as_millis() as u64
311}
312
313async fn flush_batch(
314    client: &reqwest::Client,
315    endpoint: &str,
316    auth_token: Option<&str>,
317    batch: &WebSocketUsageBatch,
318) -> bool {
319    if batch.events.is_empty() {
320        return true;
321    }
322
323    let mut request = client.post(endpoint).json(batch);
324    if let Some(token) = auth_token {
325        request = request.header("Authorization", format!("Bearer {}", token));
326    }
327
328    match request.send().await {
329        Ok(response) if response.status().is_success() => {
330            debug!(count = batch.events.len(), "flushed websocket usage batch");
331            true
332        }
333        Ok(response) => {
334            error!(status = %response.status(), count = batch.events.len(), "failed to ingest websocket usage batch");
335            false
336        }
337        Err(error) => {
338            error!(error = %error, count = batch.events.len(), "failed to post websocket usage batch");
339            false
340        }
341    }
342}
343
344async fn flush_pending_batch(
345    client: &reqwest::Client,
346    endpoint: &str,
347    auth_token: Option<&str>,
348    pending: &mut Vec<WebSocketUsageEnvelope>,
349    retry_state: &mut Option<RetryState>,
350    spool_dir: Option<&Path>,
351) {
352    let batch = WebSocketUsageBatch {
353        events: std::mem::take(pending),
354    };
355
356    if !flush_batch(client, endpoint, auth_token, &batch).await {
357        let state = RetryState {
358            batch,
359            attempts: 1,
360            next_retry_at: Instant::now() + retry_delay(1),
361        };
362
363        if let Some(dir) = spool_dir.filter(|_| MAX_IN_MEMORY_RETRIES <= 1) {
364            if let Err(error) = spool_retry_state(dir, &state) {
365                warn!(error = %error, count = state.batch.events.len(), "failed to spool websocket usage batch after first failure");
366                *retry_state = Some(state);
367            }
368        } else {
369            *retry_state = Some(state);
370        }
371    }
372}
373
374async fn flush_existing_batch(
375    client: &reqwest::Client,
376    endpoint: &str,
377    auth_token: Option<&str>,
378    mut state: RetryState,
379) -> Result<(), RetryState> {
380    if flush_batch(client, endpoint, auth_token, &state.batch).await {
381        Ok(())
382    } else {
383        state.attempts += 1;
384        state.next_retry_at = Instant::now() + retry_delay(state.attempts);
385        Err(state)
386    }
387}
388
389fn retry_delay(attempt: u32) -> Duration {
390    let capped_attempt = attempt.min(6);
391    Duration::from_secs(1_u64 << capped_attempt)
392}
393
394fn ensure_spool_dir(path: &Path) -> std::io::Result<()> {
395    std::fs::create_dir_all(path)
396}
397
398fn spool_retry_state(path: &Path, state: &RetryState) -> std::io::Result<PathBuf> {
399    spool_batch(path, &state.batch)
400}
401
402fn spool_batch(path: &Path, batch: &WebSocketUsageBatch) -> std::io::Result<PathBuf> {
403    ensure_spool_dir(path)?;
404
405    let file_name = format!(
406        "ws-usage-{}-{}.json",
407        current_time_ms(),
408        Uuid::new_v4().simple()
409    );
410    let final_path = path.join(file_name);
411    let temp_path = final_path.with_extension("tmp");
412    let data = serde_json::to_vec(batch).map_err(std::io::Error::other)?;
413    std::fs::write(&temp_path, data)?;
414    std::fs::rename(&temp_path, &final_path)?;
415    Ok(final_path)
416}
417
418fn load_batch_from_file(path: &Path) -> std::io::Result<WebSocketUsageBatch> {
419    let data = std::fs::read(path)?;
420    serde_json::from_slice(&data).map_err(std::io::Error::other)
421}
422
423fn oldest_spooled_batch(path: &Path) -> std::io::Result<Option<PathBuf>> {
424    if !path.exists() {
425        return Ok(None);
426    }
427
428    let mut entries: Vec<PathBuf> = std::fs::read_dir(path)?
429        .filter_map(|entry| entry.ok().map(|entry| entry.path()))
430        .filter(|entry| entry.extension().and_then(|ext| ext.to_str()) == Some("json"))
431        .collect();
432    entries.sort();
433    Ok(entries.into_iter().next())
434}
435
436async fn flush_one_spooled_batch(
437    client: &reqwest::Client,
438    endpoint: &str,
439    auth_token: Option<&str>,
440    spool_dir: &Path,
441) -> std::io::Result<()> {
442    let Some(path) = oldest_spooled_batch(spool_dir)? else {
443        return Ok(());
444    };
445
446    let batch = load_batch_from_file(&path)?;
447    if flush_batch(client, endpoint, auth_token, &batch).await {
448        std::fs::remove_file(path)?;
449    }
450
451    Ok(())
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use std::fs;
458
459    fn temp_spool_dir() -> PathBuf {
460        let dir = std::env::temp_dir().join(format!("hyperstack-usage-test-{}", Uuid::new_v4()));
461        fs::create_dir_all(&dir).expect("temp dir should be created");
462        dir
463    }
464
465    #[tokio::test]
466    async fn channel_usage_emitter_forwards_events() {
467        let (tx, mut rx) = mpsc::unbounded_channel();
468        let emitter = ChannelUsageEmitter::new(tx);
469
470        emitter
471            .emit(WebSocketUsageEvent::SubscriptionCreated {
472                client_id: "client-1".to_string(),
473                deployment_id: Some("deployment-1".to_string()),
474                metering_key: Some("meter-1".to_string()),
475                subject: Some("subject-1".to_string()),
476                view_id: "OreRound/latest".to_string(),
477            })
478            .await;
479
480        let event = rx.recv().await.expect("event should be forwarded");
481        match event {
482            WebSocketUsageEvent::SubscriptionCreated { view_id, .. } => {
483                assert_eq!(view_id, "OreRound/latest");
484            }
485            other => panic!("unexpected event: {other:?}"),
486        }
487    }
488
489    #[test]
490    fn retry_delay_grows_and_caps() {
491        assert_eq!(retry_delay(1), Duration::from_secs(2));
492        assert_eq!(retry_delay(2), Duration::from_secs(4));
493        assert_eq!(retry_delay(6), Duration::from_secs(64));
494        assert_eq!(retry_delay(9), Duration::from_secs(64));
495    }
496
497    #[test]
498    fn spooled_batches_round_trip() {
499        let dir = temp_spool_dir();
500        let batch = WebSocketUsageBatch {
501            events: vec![WebSocketUsageEnvelope {
502                event_id: "evt_1".to_string(),
503                occurred_at_ms: 123,
504                event: WebSocketUsageEvent::UpdateSent {
505                    client_id: "client-1".to_string(),
506                    deployment_id: Some("1".to_string()),
507                    metering_key: Some("api_key:1".to_string()),
508                    subject: Some("user:1".to_string()),
509                    view_id: "OreRound/latest".to_string(),
510                    messages: 1,
511                    bytes: 42,
512                },
513            }],
514        };
515
516        let path = spool_batch(&dir, &batch).expect("batch should spool");
517        let loaded = load_batch_from_file(&path).expect("batch should load");
518        assert_eq!(loaded.events.len(), 1);
519
520        fs::remove_dir_all(dir).expect("temp dir should be removed");
521    }
522
523    #[test]
524    fn oldest_spooled_batch_prefers_lexicographically_oldest_file() {
525        let dir = temp_spool_dir();
526        fs::write(dir.join("ws-usage-100-a.json"), b"{\"events\":[]}").expect("first batch");
527        fs::write(dir.join("ws-usage-200-b.json"), b"{\"events\":[]}").expect("second batch");
528
529        let oldest = oldest_spooled_batch(&dir)
530            .expect("listing should succeed")
531            .expect("batch should exist");
532        assert!(oldest.ends_with("ws-usage-100-a.json"));
533
534        fs::remove_dir_all(dir).expect("temp dir should be removed");
535    }
536}