use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use axum::extract::ws::{Message, WebSocket};
use serde_json::{json, Value};
use tokio::sync::broadcast;
const PING_INTERVAL: Duration = Duration::from_secs(30);
const IDLE_TIMEOUT: Duration = Duration::from_secs(75);
pub struct LiveHub {
active: AtomicUsize,
max: usize,
presence: broadcast::Sender<usize>,
}
impl LiveHub {
pub fn new() -> Arc<Self> {
let max = std::env::var("NOWAKI_LIVE_MAX")
.ok()
.and_then(|s| s.parse().ok())
.filter(|&n| n > 0)
.unwrap_or(10_000);
let (presence, _) = broadcast::channel(64);
Arc::new(Self {
active: AtomicUsize::new(0),
max,
presence,
})
}
fn join(&self) -> Option<usize> {
let prev = self.active.fetch_add(1, Ordering::SeqCst);
if prev + 1 > self.max {
self.active.fetch_sub(1, Ordering::SeqCst);
return None;
}
let n = prev + 1;
let _ = self.presence.send(n);
Some(n)
}
fn leave(&self) {
let prev = self.active.fetch_sub(1, Ordering::SeqCst);
let _ = self.presence.send(prev.saturating_sub(1));
}
fn count(&self) -> usize {
self.active.load(Ordering::SeqCst)
}
}
pub async fn handle(
mut socket: WebSocket,
hub: Arc<LiveHub>,
http: reqwest::Client,
sidecar_port: u16,
version: String,
) {
let Some(count) = hub.join() else {
let _ = socket
.send(Message::Text(
json!({ "type": "error", "reason": "at capacity" })
.to_string()
.into(),
))
.await;
let _ = socket.send(Message::Close(None)).await;
return;
};
let mut presence_rx = hub.presence.subscribe();
let _ = socket
.send(Message::Text(
json!({ "type": "presence", "count": count })
.to_string()
.into(),
))
.await;
let mut islands: std::collections::HashMap<String, (String, Value)> =
std::collections::HashMap::new();
let mut last_seen = Instant::now();
let mut ping = tokio::time::interval(PING_INTERVAL);
ping.tick().await;
loop {
tokio::select! {
incoming = socket.recv() => {
let Some(Ok(msg)) = incoming else { break };
last_seen = Instant::now(); if let Message::Text(txt) = msg {
if !handle_text(&mut socket, &mut islands, &http, sidecar_port, &version, txt.as_str()).await {
break;
}
}
}
p = presence_rx.recv() => {
let n = match p {
Ok(n) => n,
Err(broadcast::error::RecvError::Lagged(_)) => hub.count(),
Err(broadcast::error::RecvError::Closed) => break,
};
if socket.send(Message::Text(json!({ "type": "presence", "count": n }).to_string().into())).await.is_err() {
break;
}
}
_ = ping.tick() => {
if last_seen.elapsed() > IDLE_TIMEOUT {
break;
}
if socket.send(Message::Ping(Vec::new().into())).await.is_err() {
break;
}
}
}
}
hub.leave();
}
async fn handle_text(
socket: &mut WebSocket,
islands: &mut std::collections::HashMap<String, (String, Value)>,
http: &reqwest::Client,
sidecar_port: u16,
version: &str,
txt: &str,
) -> bool {
let Ok(v) = serde_json::from_str::<Value>(txt) else {
return true;
};
match v.get("type").and_then(|t| t.as_str()) {
Some("join") => {
if let Some(arr) = v.get("islands").and_then(|i| i.as_array()) {
for it in arr {
let nid = it.get("nid").and_then(|x| x.as_str()).unwrap_or("");
if nid.is_empty() {
continue;
}
let name = it
.get("name")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
let st = it.get("state").cloned().unwrap_or_else(|| json!({}));
islands.insert(nid.to_string(), (name, st));
}
}
true
}
Some("event") => {
let nid = v
.get("nid")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
let Some((name, st)) = islands.get(&nid).cloned() else {
return true;
};
let body = json!({
"name": name,
"state": st,
"handler": v.get("handler"),
"payload": v.get("payload"),
"version": version,
});
let url = format!("http://127.0.0.1:{sidecar_port}/__nowaki/live-render");
let resp = http
.post(&url)
.header("content-type", "application/json")
.body(body.to_string())
.send()
.await;
let Ok(r) = resp else {
eprintln!("[nowaki live] サイドカー再描画に失敗");
return true;
};
let Ok(txt) = r.text().await else { return true };
let Ok(rv) = serde_json::from_str::<Value>(&txt) else {
return true;
};
if let Some(ns) = rv.get("state") {
islands.insert(nid.clone(), (name, ns.clone()));
}
let html = rv.get("html").and_then(|h| h.as_str()).unwrap_or("");
let patch = json!({ "type": "patch", "nid": nid, "html": html });
socket
.send(Message::Text(patch.to_string().into()))
.await
.is_ok()
}
_ => true,
}
}