use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, broadcast};
use tokio::time::{Duration, interval};
use tracing::warn;
#[derive(Debug, Serialize, Deserialize)]
pub struct EventMessage {
pub organization_id: String,
pub data: Value,
}
static BROADCAST_TX: OnceCell<broadcast::Sender<String>> = OnceCell::new();
pub fn set_broadcast_tx(tx: broadcast::Sender<String>) {
let _ = BROADCAST_TX.set(tx);
}
pub fn has_broadcast() -> bool {
BROADCAST_TX.get().is_some()
}
static EVENT_BUFFER: OnceCell<Arc<Mutex<HashMap<String, Vec<Value>>>>> = OnceCell::new();
static EVENT_BATCHER_INIT: OnceCell<Arc<Mutex<bool>>> = OnceCell::new();
fn get_buffer() -> Arc<Mutex<HashMap<String, Vec<Value>>>> {
EVENT_BUFFER
.get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
.clone()
}
fn get_batcher_init() -> Arc<Mutex<bool>> {
EVENT_BATCHER_INIT
.get_or_init(|| Arc::new(Mutex::new(false)))
.clone()
}
async fn ensure_event_batcher_started() {
let init = get_batcher_init();
let mut guard = init.lock().await;
if !*guard {
*guard = true;
tokio::spawn(async { event_batcher_task().await });
}
}
async fn event_batcher_task() {
let mut ticker = interval(Duration::from_secs(1));
let buffer = get_buffer();
loop {
ticker.tick().await;
let events_to_send = {
let mut guard = buffer.lock().await;
if guard.is_empty() {
continue;
}
std::mem::take(&mut *guard)
};
let tx = match BROADCAST_TX.get() {
Some(t) => t.clone(),
None => continue,
};
for (organization_id, events) in events_to_send {
for data in events {
let msg = EventMessage {
organization_id: organization_id.clone(),
data,
};
let json = match serde_json::to_string(&msg) {
Ok(s) => s,
Err(e) => {
warn!("Failed to serialize event: {}", e);
continue;
}
};
if tx.send(json).is_err() {
warn!("Failed to broadcast event (no receivers?)");
}
}
}
}
}
pub async fn post_event(organization_id: String, data: Value) {
if BROADCAST_TX.get().is_none() {
return;
}
ensure_event_batcher_started().await;
let buffer = get_buffer();
let mut guard = buffer.lock().await;
guard
.entry(organization_id)
.or_insert_with(Vec::new)
.push(data);
}