use std::collections::HashMap;
use std::sync::{Arc, OnceLock, RwLock};
use super::{AgentEvent, AgentEventSink};
#[cfg(test)]
#[derive(Clone)]
pub(super) struct RegisteredSink {
pub(super) owner: std::thread::ThreadId,
pub(super) sink: Arc<dyn AgentEventSink>,
}
#[cfg(not(test))]
pub(super) type RegisteredSink = Arc<dyn AgentEventSink>;
type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
fn external_sinks() -> &'static ExternalSinkRegistry {
static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
}
pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
let session_id = session_id.into();
let mut reg = external_sinks().write().expect("sink registry poisoned");
#[cfg(test)]
let sink = RegisteredSink {
owner: std::thread::current().id(),
sink,
};
reg.entry(session_id).or_default().push(sink);
}
pub fn clear_session_sinks(session_id: &str) {
#[cfg(test)]
{
let owner = std::thread::current().id();
let mut reg = external_sinks().write().expect("sink registry poisoned");
if let Some(sinks) = reg.get_mut(session_id) {
sinks.retain(|sink| sink.owner != owner);
if sinks.is_empty() {
reg.remove(session_id);
}
}
}
#[cfg(not(test))]
{
external_sinks()
.write()
.expect("sink registry poisoned")
.remove(session_id);
}
}
pub fn reset_all_sinks() {
#[cfg(test)]
{
let owner = std::thread::current().id();
let mut reg = external_sinks().write().expect("sink registry poisoned");
reg.retain(|_, sinks| {
sinks.retain(|sink| sink.owner != owner);
!sinks.is_empty()
});
crate::agent_sessions::reset_session_store();
reset_wildcard_sinks();
}
#[cfg(not(test))]
{
external_sinks()
.write()
.expect("sink registry poisoned")
.clear();
crate::agent_sessions::reset_session_store();
wildcard_sinks()
.write()
.expect("wildcard registry poisoned")
.clear();
}
}
pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
if source_session_id.is_empty() || target_session_id.is_empty() {
return;
}
if source_session_id == target_session_id {
return;
}
let mut reg = external_sinks().write().expect("sink registry poisoned");
let Some(source_sinks) = reg.get(source_session_id).cloned() else {
return;
};
let target = reg.entry(target_session_id.to_string()).or_default();
#[cfg(test)]
{
for source in source_sinks {
let already_present = target
.iter()
.any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
if !already_present {
target.push(source);
}
}
}
#[cfg(not(test))]
{
for source in source_sinks {
let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
if !already_present {
target.push(source);
}
}
}
}
pub fn emit_event(event: &AgentEvent) {
let sinks: Vec<Arc<dyn AgentEventSink>> = {
let reg = external_sinks().read().expect("sink registry poisoned");
#[cfg(test)]
{
let owner = std::thread::current().id();
reg.get(event.session_id())
.map(|sinks| {
sinks
.iter()
.filter(|sink| sink.owner == owner)
.map(|sink| sink.sink.clone())
.collect()
})
.unwrap_or_default()
}
#[cfg(not(test))]
{
reg.get(event.session_id()).cloned().unwrap_or_default()
}
};
for sink in sinks {
sink.handle_event(event);
}
let wildcard_sinks: Vec<Arc<dyn AgentEventSink>> = {
let reg = wildcard_sinks().read().expect("wildcard registry poisoned");
#[cfg(test)]
{
let owner = std::thread::current().id();
reg.iter()
.filter(|entry| entry.owner == owner)
.map(|entry| entry.sink.clone())
.collect()
}
#[cfg(not(test))]
{
reg.iter().map(|entry| entry.sink.clone()).collect()
}
};
for sink in wildcard_sinks {
sink.handle_event(event);
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct WildcardSinkHandle(pub(crate) u64);
#[cfg(test)]
#[derive(Clone)]
struct WildcardSinkEntry {
handle: WildcardSinkHandle,
owner: std::thread::ThreadId,
sink: Arc<dyn AgentEventSink>,
}
#[cfg(not(test))]
#[derive(Clone)]
struct WildcardSinkEntry {
handle: WildcardSinkHandle,
sink: Arc<dyn AgentEventSink>,
}
type WildcardSinkRegistry = RwLock<Vec<WildcardSinkEntry>>;
fn wildcard_sinks() -> &'static WildcardSinkRegistry {
static REGISTRY: OnceLock<WildcardSinkRegistry> = OnceLock::new();
REGISTRY.get_or_init(|| RwLock::new(Vec::new()))
}
fn next_wildcard_handle() -> WildcardSinkHandle {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
WildcardSinkHandle(COUNTER.fetch_add(1, Ordering::SeqCst))
}
pub fn register_wildcard_sink(sink: Arc<dyn AgentEventSink>) -> WildcardSinkHandle {
let handle = next_wildcard_handle();
let mut reg = wildcard_sinks()
.write()
.expect("wildcard registry poisoned");
#[cfg(test)]
let entry = WildcardSinkEntry {
handle,
owner: std::thread::current().id(),
sink,
};
#[cfg(not(test))]
let entry = WildcardSinkEntry { handle, sink };
reg.push(entry);
handle
}
pub fn unregister_wildcard_sink(handle: WildcardSinkHandle) {
let mut reg = wildcard_sinks()
.write()
.expect("wildcard registry poisoned");
reg.retain(|entry| entry.handle != handle);
}
#[cfg(test)]
pub fn reset_wildcard_sinks() {
let owner = std::thread::current().id();
let mut reg = wildcard_sinks()
.write()
.expect("wildcard registry poisoned");
reg.retain(|entry| entry.owner != owner);
}
pub fn session_external_sink_count(session_id: &str) -> usize {
#[cfg(test)]
{
let owner = std::thread::current().id();
return external_sinks()
.read()
.expect("sink registry poisoned")
.get(session_id)
.map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
.unwrap_or(0);
}
#[cfg(not(test))]
{
external_sinks()
.read()
.expect("sink registry poisoned")
.get(session_id)
.map(|v| v.len())
.unwrap_or(0)
}
}
pub fn session_closure_subscriber_count(session_id: &str) -> usize {
crate::agent_sessions::subscriber_count(session_id)
}