athena_rs 1.1.0

Database gateway API
Documentation
//! Event batching and broadcast for the CDC WebSocket server.
//!
//! Events are buffered per `organization_id` and flushed once per second,
//! broadcasting to WebSocket subscribers via the server's channel when available.

use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, broadcast};
use tokio::time::{Duration, interval};
use tracing::warn;

/// Event shape expected by WebSocket subscribers.
#[derive(Debug, Serialize, Deserialize)]
pub struct EventMessage {
    pub organization_id: String,
    pub data: Value,
}

/// Optional broadcast sender, set when the websocket server starts.
static BROADCAST_TX: OnceCell<broadcast::Sender<String>> = OnceCell::new();

/// Register the broadcast sender. Called by `websocket_server` on startup.
pub fn set_broadcast_tx(tx: broadcast::Sender<String>) {
    let _ = BROADCAST_TX.set(tx);
}

/// Returns true if the broadcast channel is configured (websocket server running).
pub fn has_broadcast() -> bool {
    BROADCAST_TX.get().is_some()
}

// Event buffer that collects events per organization_id
static EVENT_BUFFER: OnceCell<Arc<Mutex<HashMap<String, Vec<Value>>>>> = OnceCell::new();
static EVENT_BATCHER_INIT: OnceCell<Arc<Mutex<bool>>> = OnceCell::new();

fn get_buffer() -> Arc<Mutex<HashMap<String, Vec<Value>>>> {
    EVENT_BUFFER
        .get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
        .clone()
}

fn get_batcher_init() -> Arc<Mutex<bool>> {
    EVENT_BATCHER_INIT
        .get_or_init(|| Arc::new(Mutex::new(false)))
        .clone()
}

/// Initialize the event batcher background task
async fn ensure_event_batcher_started() {
    let init = get_batcher_init();
    let mut guard = init.lock().await;
    if !*guard {
        *guard = true;
        tokio::spawn(async { event_batcher_task().await });
    }
}

/// Background task that flushes events every second to the WebSocket broadcast
async fn event_batcher_task() {
    let mut ticker = interval(Duration::from_secs(1));
    let buffer = get_buffer();

    loop {
        ticker.tick().await;

        let events_to_send = {
            let mut guard = buffer.lock().await;
            if guard.is_empty() {
                continue;
            }
            std::mem::take(&mut *guard)
        };

        let tx = match BROADCAST_TX.get() {
            Some(t) => t.clone(),
            None => continue,
        };

        for (organization_id, events) in events_to_send {
            for data in events {
                let msg = EventMessage {
                    organization_id: organization_id.clone(),
                    data,
                };
                let json = match serde_json::to_string(&msg) {
                    Ok(s) => s,
                    Err(e) => {
                        warn!("Failed to serialize event: {}", e);
                        continue;
                    }
                };
                if tx.send(json).is_err() {
                    warn!("Failed to broadcast event (no receivers?)");
                }
            }
        }
    }
}

/// Post an event to the CDC system (batched).
/// Events are collected and broadcast once per second to WebSocket subscribers.
/// No-ops if the websocket server has not been started (no broadcast channel).
pub async fn post_event(organization_id: String, data: Value) {
    if BROADCAST_TX.get().is_none() {
        return;
    }
    ensure_event_batcher_started().await;
    let buffer = get_buffer();
    let mut guard = buffer.lock().await;
    guard
        .entry(organization_id)
        .or_insert_with(Vec::new)
        .push(data);
}