use std::sync::Mutex;
use std::sync::mpsc::Sender;
use serde::{Deserialize, Serialize};
use super::parent_ui::UiMessage;
pub const ENV_IPC_EVENTS: &str = "RIVET_IPC_EVENTS";
pub(crate) static IN_PROCESS_TX: Mutex<Option<Sender<UiMessage>>> = Mutex::new(None);
pub(crate) fn install_in_process_tx(tx: Sender<UiMessage>) {
*IN_PROCESS_TX.lock().expect("ipc tx mutex poisoned") = Some(tx);
}
pub(crate) fn clear_in_process_tx() {
*IN_PROCESS_TX.lock().expect("ipc tx mutex poisoned") = None;
}
pub(crate) fn in_process_events_enabled() -> bool {
IN_PROCESS_TX
.lock()
.expect("ipc tx mutex poisoned")
.is_some()
}
pub(crate) fn capturing_events() -> bool {
ipc_events_enabled() || in_process_events_enabled()
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ChildEvent {
Started {
export_name: String,
run_id: String,
mode: String,
tuning_profile: String,
batch_size: usize,
},
ProgressInit {
export_name: String,
total_chunks: u64,
},
Progress {
export_name: String,
chunks_done: u64,
rows: i64,
},
Finished {
export_name: String,
run_id: String,
status: String,
total_rows: i64,
files_produced: u64,
bytes_written: u64,
duration_ms: i64,
peak_rss_mb: i64,
error_message: Option<String>,
},
}
impl ChildEvent {
#[allow(dead_code)] pub fn export_name(&self) -> &str {
match self {
ChildEvent::Started { export_name, .. }
| ChildEvent::ProgressInit { export_name, .. }
| ChildEvent::Progress { export_name, .. }
| ChildEvent::Finished { export_name, .. } => export_name,
}
}
}
pub fn ipc_events_enabled() -> bool {
std::env::var(ENV_IPC_EVENTS)
.map(|v| !v.is_empty())
.unwrap_or(false)
}
pub fn emit(event: &ChildEvent) {
use std::io::Write;
let line = match serde_json::to_string(event) {
Ok(s) => s,
Err(e) => {
log::debug!("ipc: failed to serialize event {:?}: {:#}", event, e);
return;
}
};
let stdout = std::io::stdout();
let mut h = stdout.lock();
if let Err(e) = writeln!(h, "{line}") {
log::debug!("ipc: failed to write event to stdout: {:#}", e);
return;
}
let _ = h.flush();
}
pub(crate) fn emit_event(event: &ChildEvent) {
if ipc_events_enabled() {
emit(event);
return;
}
if let Ok(guard) = IN_PROCESS_TX.lock()
&& let Some(tx) = guard.as_ref()
{
let _ = tx.send(UiMessage::Event(event.clone()));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn round_trip_started() {
let ev = ChildEvent::Started {
export_name: "orders".into(),
run_id: "orders_20260427T120000".into(),
mode: "chunked".into(),
tuning_profile: "balanced (default)".into(),
batch_size: 10_000,
};
let json = serde_json::to_string(&ev).unwrap();
let back: ChildEvent = serde_json::from_str(&json).unwrap();
assert_eq!(ev, back);
assert!(json.contains("\"type\":\"started\""));
}
#[test]
fn round_trip_progress() {
let ev = ChildEvent::Progress {
export_name: "events".into(),
chunks_done: 7,
rows: 1_234_567,
};
let s = serde_json::to_string(&ev).unwrap();
let back: ChildEvent = serde_json::from_str(&s).unwrap();
assert_eq!(ev, back);
}
#[test]
fn round_trip_finished_with_error() {
let ev = ChildEvent::Finished {
export_name: "users".into(),
run_id: "users_20260427T120000".into(),
status: "failed".into(),
total_rows: 0,
files_produced: 0,
bytes_written: 0,
duration_ms: 1234,
peak_rss_mb: 12,
error_message: Some("connection reset".into()),
};
let s = serde_json::to_string(&ev).unwrap();
let back: ChildEvent = serde_json::from_str(&s).unwrap();
assert_eq!(ev, back);
}
#[test]
fn export_name_routing() {
let ev = ChildEvent::ProgressInit {
export_name: "orders".into(),
total_chunks: 20,
};
assert_eq!(ev.export_name(), "orders");
}
#[test]
fn ipc_events_enabled_false_by_default() {
let _ = ipc_events_enabled();
}
}