use tokio::sync::{broadcast, mpsc, oneshot};
use atm_core::{Harness, LifecycleEvent, SessionDomain, SessionId, SessionView};
use super::commands::{RegistryCommand, RegistryError, SessionEvent};
#[derive(Clone)]
pub struct RegistryHandle {
sender: mpsc::Sender<RegistryCommand>,
event_sender: broadcast::Sender<SessionEvent>,
}
impl RegistryHandle {
pub fn new(
sender: mpsc::Sender<RegistryCommand>,
event_sender: broadcast::Sender<SessionEvent>,
) -> Self {
Self {
sender,
event_sender,
}
}
pub async fn register(&self, session: SessionDomain) -> Result<(), RegistryError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
})
.await
.map_err(|_| RegistryError::ChannelClosed)?;
rx.await.map_err(|_| RegistryError::ChannelClosed)?
}
pub async fn update_from_status_line(
&self,
session_id: SessionId,
data: serde_json::Value,
) -> Result<(), RegistryError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(RegistryCommand::UpdateFromStatusLine {
session_id,
data,
respond_to: tx,
})
.await
.map_err(|_| RegistryError::ChannelClosed)?;
rx.await.map_err(|_| RegistryError::ChannelClosed)?
}
pub async fn apply_lifecycle_event(
&self,
session_id: SessionId,
event: LifecycleEvent,
harness: Harness,
pid: Option<u32>,
tmux_pane: Option<String>,
) -> Result<(), RegistryError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(RegistryCommand::ApplyLifecycleEvent {
session_id,
event,
harness,
pid,
tmux_pane,
respond_to: tx,
})
.await
.map_err(|_| RegistryError::ChannelClosed)?;
rx.await.map_err(|_| RegistryError::ChannelClosed)?
}
pub async fn get_session(&self, session_id: SessionId) -> Option<SessionView> {
let (tx, rx) = oneshot::channel();
self.sender
.send(RegistryCommand::GetSession {
session_id,
respond_to: tx,
})
.await
.ok()?;
rx.await.ok()?
}
pub async fn get_all_sessions(&self) -> Vec<SessionView> {
let (tx, rx) = oneshot::channel();
if self
.sender
.send(RegistryCommand::GetAllSessions { respond_to: tx })
.await
.is_err()
{
return Vec::new();
}
rx.await.unwrap_or_default()
}
pub async fn remove(&self, session_id: SessionId) -> Result<(), RegistryError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(RegistryCommand::Remove {
session_id,
respond_to: tx,
})
.await
.map_err(|_| RegistryError::ChannelClosed)?;
rx.await.map_err(|_| RegistryError::ChannelClosed)?
}
pub async fn cleanup_stale(&self) {
let _ = self.sender.send(RegistryCommand::CleanupStale).await;
}
pub async fn register_discovered(
&self,
session_id: SessionId,
pid: u32,
cwd: std::path::PathBuf,
tmux_pane: Option<String>,
harness: Harness,
) -> Result<(), RegistryError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(RegistryCommand::RegisterDiscovered {
session_id,
pid,
cwd,
tmux_pane,
harness,
respond_to: tx,
})
.await
.map_err(|_| RegistryError::ChannelClosed)?;
rx.await.map_err(|_| RegistryError::ChannelClosed)?
}
pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
self.event_sender.subscribe()
}
pub fn is_connected(&self) -> bool {
!self.sender.is_closed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use atm_core::{AgentType, Model, Tool};
fn create_test_handle() -> (RegistryHandle, mpsc::Receiver<RegistryCommand>) {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (event_tx, _event_rx) = broadcast::channel(16);
let handle = RegistryHandle::new(cmd_tx, event_tx);
(handle, cmd_rx)
}
fn create_test_session(id: &str) -> SessionDomain {
SessionDomain::new(
SessionId::new(id),
AgentType::GeneralPurpose,
Model::Sonnet4,
)
}
#[tokio::test]
async fn test_handle_is_clone() {
let (handle, _rx) = create_test_handle();
let _cloned = handle.clone();
}
#[tokio::test]
async fn test_register_sends_command() {
let (handle, mut rx) = create_test_handle();
let session = create_test_session("test-123");
let cmd_handler = tokio::spawn(async move {
if let Some(RegistryCommand::Register {
session,
respond_to,
}) = rx.recv().await
{
assert_eq!(session.id.as_str(), "test-123");
let _ = respond_to.send(Ok(()));
return true;
}
false
});
let result = handle.register(session).await;
assert!(result.is_ok());
assert!(cmd_handler.await.unwrap());
}
#[tokio::test]
async fn test_register_channel_closed_error() {
let (handle, rx) = create_test_handle();
drop(rx);
let session = create_test_session("test-123");
let result = handle.register(session).await;
assert!(matches!(result, Err(RegistryError::ChannelClosed)));
}
#[tokio::test]
async fn test_get_session_returns_none_on_channel_close() {
let (handle, rx) = create_test_handle();
drop(rx);
let result = handle.get_session(SessionId::new("test-123")).await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_get_all_sessions_returns_empty_on_channel_close() {
let (handle, rx) = create_test_handle();
drop(rx);
let result = handle.get_all_sessions().await;
assert!(result.is_empty());
}
#[tokio::test]
async fn test_cleanup_stale_fire_and_forget() {
let (handle, mut rx) = create_test_handle();
let cmd_handler = tokio::spawn(async move {
if let Some(RegistryCommand::CleanupStale) = rx.recv().await {
return true;
}
false
});
handle.cleanup_stale().await;
assert!(cmd_handler.await.unwrap());
}
#[tokio::test]
async fn test_cleanup_stale_ignores_closed_channel() {
let (handle, rx) = create_test_handle();
drop(rx);
handle.cleanup_stale().await;
}
#[tokio::test]
async fn test_subscribe_returns_receiver() {
let (handle, _rx) = create_test_handle();
let _subscriber = handle.subscribe();
}
#[tokio::test]
async fn test_is_connected() {
let (handle, rx) = create_test_handle();
assert!(handle.is_connected());
drop(rx);
let _ = handle.sender.send(RegistryCommand::CleanupStale).await;
assert!(!handle.is_connected());
}
#[tokio::test]
async fn test_update_from_status_line() {
let (handle, mut rx) = create_test_handle();
let cmd_handler = tokio::spawn(async move {
if let Some(RegistryCommand::UpdateFromStatusLine {
session_id,
data,
respond_to,
}) = rx.recv().await
{
assert_eq!(session_id.as_str(), "test-123");
assert!(data.get("model").is_some());
let _ = respond_to.send(Ok(()));
return true;
}
false
});
let data = serde_json::json!({
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 0.25}
});
let result = handle
.update_from_status_line(SessionId::new("test-123"), data)
.await;
assert!(result.is_ok());
assert!(cmd_handler.await.unwrap());
}
#[tokio::test]
async fn test_apply_lifecycle_event() {
let (handle, mut rx) = create_test_handle();
let cmd_handler = tokio::spawn(async move {
if let Some(RegistryCommand::ApplyLifecycleEvent {
session_id,
event,
harness,
pid,
tmux_pane,
respond_to,
}) = rx.recv().await
{
assert_eq!(session_id.as_str(), "test-123");
assert_eq!(
event,
LifecycleEvent::ToolCallStart {
name: Tool::Bash,
tool_use_id: None,
input: None,
}
);
assert_eq!(harness, Harness::Pi);
assert_eq!(pid, Some(12345));
assert_eq!(tmux_pane, Some("%5".to_string()));
let _ = respond_to.send(Ok(()));
return true;
}
false
});
let result = handle
.apply_lifecycle_event(
SessionId::new("test-123"),
LifecycleEvent::ToolCallStart {
name: Tool::Bash,
tool_use_id: None,
input: None,
},
Harness::Pi,
Some(12345),
Some("%5".to_string()),
)
.await;
assert!(result.is_ok());
assert!(cmd_handler.await.unwrap());
}
#[tokio::test]
async fn test_remove() {
let (handle, mut rx) = create_test_handle();
let cmd_handler = tokio::spawn(async move {
if let Some(RegistryCommand::Remove {
session_id,
respond_to,
}) = rx.recv().await
{
assert_eq!(session_id.as_str(), "test-123");
let _ = respond_to.send(Ok(()));
return true;
}
false
});
let result = handle.remove(SessionId::new("test-123")).await;
assert!(result.is_ok());
assert!(cmd_handler.await.unwrap());
}
}