use {
super::{
ConnectedClient,
ClientId,
ClientKind,
wants_lsp_notifications,
},
crate::{
connect::ipc::Connection,
protocol::jsonrpc::{
Message,
Notification,
},
},
dashmap::DashMap,
parking_lot::RwLock,
serde::{
Deserialize,
Serialize,
},
std::{
collections::HashMap,
sync::{
Arc,
atomic::{
AtomicU64,
Ordering,
},
},
time::{
Duration,
Instant,
},
},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectedClientInfo {
pub id: ClientId,
pub kind: String,
pub client_name: Option<String>,
pub client_version: Option<String>,
pub connected_at: u64,
}
pub struct ClientRegistry {
clients: DashMap<ClientId, ConnectedClient>,
last_disconnect_at: RwLock<Option<Instant>>,
next_client_id: AtomicU64,
}
impl ClientRegistry {
pub fn new() -> Self {
Self {
clients: DashMap::new(),
last_disconnect_at: RwLock::new(Some(Instant::now())),
next_client_id: AtomicU64::new(1),
}
}
fn generate_client_id(&self) -> ClientId {
ClientId::from_raw(self.next_client_id.fetch_add(1, Ordering::Relaxed))
}
pub fn register(
&self,
kind: ClientKind,
connection: Connection,
metadata: HashMap<String, String>,
) -> ClientId {
let id = self.generate_client_id();
let client = ConnectedClient::with_id(id, kind, connection, metadata);
self.clients.insert(id, client);
*self.last_disconnect_at.write() = None;
otel::event!(
"client_registered",
"client_id" = id.as_u64() as i64,
"client_kind" = kind.to_string()
);
id
}
pub fn unregister(&self, id: ClientId) {
if let Some((_, client)) = self.clients.remove(&id) {
if self.clients.is_empty() {
*self.last_disconnect_at.write() = Some(Instant::now());
}
otel::event!(
"client_unregistered",
"client_id" = id.as_u64() as i64,
"client_kind" = client.kind().to_string()
);
}
}
pub fn client_count(&self) -> usize {
self.clients.len()
}
pub fn idle_duration(&self) -> Option<Duration> {
self.last_disconnect_at.read().map(|t| t.elapsed())
}
pub fn get(
&self,
id: ClientId,
) -> Option<dashmap::mapref::one::Ref<'_, ClientId, ConnectedClient>> {
self.clients.get(&id)
}
pub fn get_mut(
&self,
id: ClientId,
) -> Option<dashmap::mapref::one::RefMut<'_, ClientId, ConnectedClient>> {
self.clients.get_mut(&id)
}
pub fn subscribe(&self, id: ClientId, topic: impl super::Topic) -> bool {
if let Some(mut client) = self.clients.get_mut(&id) {
client.subscribe(topic);
true
} else {
false
}
}
pub fn unsubscribe(&self, id: ClientId, topic: impl super::Topic) -> bool {
if let Some(mut client) = self.clients.get_mut(&id) {
client.unsubscribe(topic);
true
} else {
false
}
}
pub async fn broadcast(&self, topic: impl super::Topic, notification: Notification) {
let message = Message::Notification(notification);
for entry in self.clients.iter() {
let client = entry.value();
if !wants_lsp_notifications(client.kind()) {
continue;
}
if client.is_subscribed(topic)
&& let Err(e) = client.connection().sender.send(message.clone()).await
{
otel::error!(
"broadcast_send_failed",
format!("Failed to send to client {}: {}", client.id(), e)
);
}
}
}
pub async fn broadcast_all(&self, notification: Notification) {
let message = Message::Notification(notification);
for entry in self.clients.iter() {
let client = entry.value();
if !wants_lsp_notifications(client.kind()) {
continue;
}
if let Err(e) = client.connection().sender.send(message.clone()).await {
otel::error!(
"broadcast_all_send_failed",
format!("Failed to send to client {}: {}", client.id(), e)
);
}
}
}
pub async fn send_to(
&self,
id: ClientId,
message: Message,
) -> Result<(), SendError> {
let client = self.clients.get(&id).ok_or(SendError::ClientNotFound)?;
client
.connection()
.sender
.send(message)
.await
.map_err(|_| SendError::ChannelClosed)
}
pub fn iter(
&self,
) -> impl Iterator<
Item = dashmap::mapref::multiple::RefMulti<'_, ClientId, ConnectedClient>,
> + '_ {
self.clients.iter()
}
pub fn client_ids(&self) -> Vec<ClientId> {
self.clients.iter().map(|e| *e.key()).collect()
}
pub fn connected_clients(&self) -> Vec<ConnectedClientInfo> {
self
.clients
.iter()
.map(|entry| {
let client = entry.value();
ConnectedClientInfo {
id: client.id(),
kind: client.kind().to_string(),
client_name: client.client_name().map(String::from),
client_version: client.client_version().map(String::from),
connected_at: client.connected_at_unix(),
}
})
.collect()
}
}
impl Default for ClientRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SendError {
ClientNotFound,
ChannelClosed,
}
impl std::fmt::Display for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
| SendError::ClientNotFound => write!(f, "client not found"),
| SendError::ChannelClosed => write!(f, "channel closed"),
}
}
}
impl std::error::Error for SendError {}
pub type SharedRegistry = Arc<ClientRegistry>;
#[cfg(test)]
mod tests {
use {
super::*,
crate::protocol::jsonrpc::{
Message,
Notification as JsonRpcNotification,
},
serde_json::json,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum TestTopic {
Diagnostics,
BuildProgress,
}
impl super::super::Topic for TestTopic {
fn name(&self) -> &'static str {
match self {
| TestTopic::Diagnostics => "diagnostics",
| TestTopic::BuildProgress => "build_progress",
}
}
}
#[test]
fn registry_register_unregister() {
let registry = ClientRegistry::new();
let (server_conn, _client_conn) = Connection::memory();
assert_eq!(registry.client_count(), 0);
let id = registry.register(ClientKind::Cli, server_conn, HashMap::new());
assert_eq!(registry.client_count(), 1);
registry.unregister(id);
assert_eq!(registry.client_count(), 0);
}
#[test]
fn registry_subscribe_topic() {
let registry = ClientRegistry::new();
let (server_conn, _client_conn) = Connection::memory();
let id = registry.register(ClientKind::Cli, server_conn, HashMap::new());
assert!(registry.subscribe(id, TestTopic::Diagnostics));
let client = registry.get(id).unwrap();
assert!(client.is_subscribed(TestTopic::Diagnostics));
assert!(!client.is_subscribed(TestTopic::BuildProgress));
}
#[test]
fn registry_broadcast() {
smol::block_on(async {
let registry = ClientRegistry::new();
let (server_conn1, client_conn1) = Connection::memory();
let (server_conn2, client_conn2) = Connection::memory();
let id1 = registry.register(ClientKind::Cli, server_conn1, HashMap::new());
let _id2 = registry.register(ClientKind::Cli, server_conn2, HashMap::new());
registry.subscribe(id1, TestTopic::Diagnostics);
let notification = JsonRpcNotification::build("test/notification")
.params(json!({"key": "value"}))
.finish();
registry.broadcast(TestTopic::Diagnostics, notification).await;
let msg1 = client_conn1.receiver.try_recv();
assert!(msg1.is_ok());
let msg2 = client_conn2.receiver.try_recv();
assert!(msg2.is_err());
});
}
#[test]
fn registry_send_to() {
smol::block_on(async {
let registry = ClientRegistry::new();
let (server_conn, client_conn) = Connection::memory();
let id = registry.register(ClientKind::Cli, server_conn, HashMap::new());
let notification = JsonRpcNotification::build("test/notification")
.params(json!({"key": "value"}))
.finish();
let message = Message::Notification(notification);
let result = registry.send_to(id, message).await;
assert!(result.is_ok());
let received = client_conn.receiver.try_recv();
assert!(received.is_ok());
});
}
#[test]
fn registry_send_to_unknown_client() {
smol::block_on(async {
let registry = ClientRegistry::new();
let unknown_id = ClientId::from_raw(999999);
let notification =
JsonRpcNotification::build("test/notification").finish();
let result = registry
.send_to(unknown_id, Message::Notification(notification))
.await;
assert_eq!(result.unwrap_err(), SendError::ClientNotFound);
});
}
#[test]
fn registry_client_ids() {
let registry = ClientRegistry::new();
let (conn1, _) = Connection::memory();
let id1 = registry.register(ClientKind::Cli, conn1, HashMap::new());
let (conn2, _) = Connection::memory();
let id2 = registry.register(ClientKind::Cli, conn2, HashMap::new());
let ids = registry.client_ids();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&id1));
assert!(ids.contains(&id2));
}
#[test]
fn test_idle_duration_some_at_cold_start() {
let registry = ClientRegistry::new();
let idle = registry.idle_duration();
assert!(
idle.is_some(),
"fresh registry must report idle duration so orphaned daemons can shut \
down without ever having seen a client"
);
assert!(idle.unwrap() < std::time::Duration::from_secs(1));
}
#[test]
fn test_idle_duration_returns_none_when_clients_connected() {
let registry = ClientRegistry::new();
let (server_conn, _client_conn) = Connection::memory();
let _id = registry.register(ClientKind::Cli, server_conn, HashMap::new());
assert!(registry.idle_duration().is_none());
}
#[test]
fn test_idle_duration_returns_elapsed_after_disconnect() {
let registry = ClientRegistry::new();
let (server_conn, _client_conn) = Connection::memory();
let id = registry.register(ClientKind::Cli, server_conn, HashMap::new());
registry.unregister(id);
let idle = registry.idle_duration();
assert!(idle.is_some());
assert!(idle.unwrap() < std::time::Duration::from_secs(1));
}
#[test]
fn test_idle_duration_resets_on_new_connection() {
let registry = ClientRegistry::new();
let (conn1, _) = Connection::memory();
let (conn2, _) = Connection::memory();
let id1 = registry.register(ClientKind::Cli, conn1, HashMap::new());
registry.unregister(id1);
assert!(registry.idle_duration().is_some());
let _id2 = registry.register(ClientKind::Cli, conn2, HashMap::new());
assert!(registry.idle_duration().is_none());
}
#[test]
fn test_idle_duration_only_starts_when_all_clients_disconnect() {
let registry = ClientRegistry::new();
let (conn1, _) = Connection::memory();
let (conn2, _) = Connection::memory();
let id1 = registry.register(ClientKind::Cli, conn1, HashMap::new());
let id2 = registry.register(ClientKind::Cli, conn2, HashMap::new());
registry.unregister(id1);
assert!(registry.idle_duration().is_none());
registry.unregister(id2);
assert!(registry.idle_duration().is_some());
}
#[test]
fn test_broadcast_all_skips_mcp_clients() {
smol::block_on(async {
let registry = ClientRegistry::new();
let (server_conn_ide, client_conn_ide) = Connection::memory();
let (server_conn_mcp, client_conn_mcp) = Connection::memory();
let _id_ide =
registry.register(ClientKind::Ide, server_conn_ide, HashMap::new());
let _id_mcp =
registry.register(ClientKind::Mcp, server_conn_mcp, HashMap::new());
let notification = JsonRpcNotification::build("$/serverShutdown")
.params(json!({"reason": "test", "message": "test"}))
.finish();
registry.broadcast_all(notification).await;
assert!(
client_conn_ide.receiver.try_recv().is_ok(),
"IDE client should receive notifications",
);
assert!(
client_conn_mcp.receiver.try_recv().is_err(),
"MCP client should NOT receive LSP notifications",
);
});
}
#[test]
fn test_broadcast_all_delivers_to_all_clients() {
smol::block_on(async {
let registry = ClientRegistry::new();
let (server_conn1, client_conn1) = Connection::memory();
let (server_conn2, client_conn2) = Connection::memory();
let (server_conn3, client_conn3) = Connection::memory();
let _id1 = registry.register(ClientKind::Ide, server_conn1, HashMap::new());
let _id2 = registry.register(ClientKind::Cli, server_conn2, HashMap::new());
let _id3 = registry.register(ClientKind::Cli, server_conn3, HashMap::new());
let notification = JsonRpcNotification::build("$/serverShutdown")
.params(json!({
"reason": "idle_timeout",
"message": "Server shutting down"
}))
.finish();
registry.broadcast_all(notification).await;
let msg1 = client_conn1.receiver.try_recv();
let msg2 = client_conn2.receiver.try_recv();
let msg3 = client_conn3.receiver.try_recv();
assert!(msg1.is_ok(), "IDE client should receive notification");
assert!(msg2.is_ok(), "CLI client 1 should receive notification");
assert!(msg3.is_ok(), "CLI client 2 should receive notification");
if let Ok(Message::Notification(n)) = msg1 {
assert_eq!(n.method(), "$/serverShutdown");
} else {
panic!("Expected notification message");
}
});
}
#[test]
fn test_broadcast_all_continues_when_client_channel_closed() {
smol::block_on(async {
let registry = ClientRegistry::new();
let (server_conn1, client_conn1) = Connection::memory();
let (server_conn2, _client_conn2) = Connection::memory();
let (server_conn3, client_conn3) = Connection::memory();
let _id1 = registry.register(ClientKind::Cli, server_conn1, HashMap::new());
let _id2 = registry.register(ClientKind::Cli, server_conn2, HashMap::new());
let _id3 = registry.register(ClientKind::Cli, server_conn3, HashMap::new());
drop(_client_conn2);
let notification =
JsonRpcNotification::build("test/notification").finish();
registry.broadcast_all(notification).await;
let msg1 = client_conn1.receiver.try_recv();
let msg3 = client_conn3.receiver.try_recv();
assert!(
msg1.is_ok(),
"First client should receive despite second channel closed"
);
assert!(
msg3.is_ok(),
"Third client should receive despite second channel closed"
);
});
}
}