use crate::{github::GitHubClient, store::Store, types::*};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{broadcast, Mutex};
#[derive(Debug, Clone)]
pub enum Event {
OrchestratorSpawned(Orchestrator),
OrchestratorRemoved(OrchestratorId),
SessionUpdated(Session),
SessionSpawned(Session),
SessionDone(SessionId),
TerminalOutput { session_id: SessionId, bytes: Vec<u8> },
CiUpdated { pr_id: PrId, status: CIStatus },
PrOpened { session_id: SessionId, pr: PR },
ReviewComment { pr_id: PrId, comment: Comment },
Notification(Notification),
}
pub struct Engine {
pub store: Arc<Store>,
tx: broadcast::Sender<Event>,
pty_writers: Mutex<HashMap<SessionId, tokio::sync::mpsc::UnboundedSender<Vec<u8>>>>,
stream_cancel: Mutex<HashMap<SessionId, tokio::sync::oneshot::Sender<()>>>,
pub github: Option<GitHubClient>,
}
impl Engine {
pub fn new(store: Arc<Store>) -> Arc<Self> {
let (tx, _) = broadcast::channel(256);
Arc::new(Self {
store,
tx,
pty_writers: Mutex::new(HashMap::new()),
stream_cancel: Mutex::new(HashMap::new()),
github: None,
})
}
pub fn new_with_github(store: Arc<Store>, token: String) -> Arc<Self> {
let (tx, _) = broadcast::channel(256);
let github = GitHubClient::new(token).ok();
Arc::new(Self {
store,
tx,
pty_writers: Mutex::new(HashMap::new()),
stream_cancel: Mutex::new(HashMap::new()),
github,
})
}
pub async fn register_stream(
&self,
session_id: SessionId,
) -> tokio::sync::oneshot::Receiver<()> {
let mut map = self.stream_cancel.lock().await;
if let Some(old_tx) = map.remove(&session_id) {
let _ = old_tx.send(());
}
let (tx, rx) = tokio::sync::oneshot::channel();
map.insert(session_id, tx);
rx
}
pub fn emit(&self, event: Event) {
let _ = self.tx.send(event);
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.tx.subscribe()
}
pub async fn register_pty_writer(
&self,
session_id: SessionId,
writer: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
) {
self.pty_writers.lock().await.insert(session_id, writer);
}
pub async fn get_pty_writer(
&self,
session_id: &str,
) -> Option<tokio::sync::mpsc::UnboundedSender<Vec<u8>>> {
self.pty_writers.lock().await.get(session_id).cloned()
}
pub async fn remove_orchestrator(&self, orchestrator_id: &str) -> anyhow::Result<()> {
let workers = self.store.sessions_by_orchestrator(orchestrator_id)?;
for session in &workers {
let _ = crate::tmux::kill_session(&session.id).await;
if let Some(ref wp) = session.workspace_path {
remove_worker_worktree(wp, &session.id).await;
}
}
let _ = crate::tmux::kill_session(orchestrator_id).await;
self.store.delete_orchestrator(orchestrator_id)?;
for session in workers {
self.emit(Event::SessionDone(session.id));
}
self.emit(Event::OrchestratorRemoved(orchestrator_id.to_string()));
Ok(())
}
pub async fn remove_session(&self, session_id: &str) -> anyhow::Result<()> {
let _ = crate::tmux::kill_session(session_id).await;
if let Ok(Some(session)) = self.store.get_session(session_id) {
if let Some(ref wp) = session.workspace_path {
remove_worker_worktree(wp, session_id).await;
}
}
self.store.delete_session(session_id)?;
self.emit(Event::SessionDone(session_id.to_string()));
Ok(())
}
pub async fn send_to_session(&self, session_id: &str, message: &str) -> anyhow::Result<()> {
crate::tmux::send_keys(session_id, message).await
}
pub async fn terminate_session(&self, session_id: &str) -> anyhow::Result<()> {
let _ = crate::tmux::kill_session(session_id).await;
if let Some(mut session) = self.store.get_session(session_id)? {
session.status = crate::types::SessionStatus::Terminated;
self.store.upsert_session(&session)?;
self.emit(Event::SessionUpdated(session));
}
Ok(())
}
pub async fn cleanup_session(&self, session_id: &str) -> anyhow::Result<()> {
let _ = crate::tmux::kill_session(session_id).await;
if let Some(mut session) = self.store.get_session(session_id)? {
session.status = crate::types::SessionStatus::Done;
self.store.upsert_session(&session)?;
self.emit(Event::SessionUpdated(session));
}
Ok(())
}
}
async fn remove_worker_worktree(workspace_path: &str, session_id: &str) {
let suffix = format!("/.claude/worktrees/{session_id}");
if !workspace_path.ends_with(&suffix) {
return; }
let repo_root = &workspace_path[..workspace_path.len() - suffix.len()];
if repo_root.is_empty() {
return;
}
let _ = tokio::process::Command::new("git")
.args(["-C", repo_root, "worktree", "remove", "--force", workspace_path])
.output()
.await;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::Store;
use tempfile::tempdir;
#[tokio::test]
async fn emit_received_by_subscriber() {
let store = Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
let engine = Engine::new(store);
let mut rx = engine.subscribe();
engine.emit(Event::SessionDone("s1".into()));
let event = rx.recv().await.unwrap();
assert!(matches!(event, Event::SessionDone(id) if id == "s1"));
}
#[tokio::test]
async fn terminate_emits_session_updated() {
let store = Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
let session = crate::types::Session {
id: "s1".into(), orchestrator_id: None, name: "w".into(),
repo: "r".into(), status: crate::types::SessionStatus::Working,
agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
pr_number: None, pr_id: None, workspace_path: None, pid: None,
};
store.upsert_session(&session).unwrap();
let engine = Engine::new(store);
let mut rx = engine.subscribe();
engine.terminate_session("s1").await.unwrap();
let evt = rx.recv().await.unwrap();
if let Event::SessionUpdated(s) = evt {
assert!(matches!(s.status, crate::types::SessionStatus::Terminated));
} else {
panic!("expected SessionUpdated");
}
}
#[tokio::test]
async fn cleanup_session_sets_done_status() {
let store = Arc::new(
Store::open(tempdir().unwrap().keep().join("t.db")).unwrap()
);
let session = crate::types::Session {
id: "s1".into(), orchestrator_id: None, name: "w".into(),
repo: "r".into(), status: crate::types::SessionStatus::PrOpen,
agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
pr_number: Some(1), pr_id: Some(1),
workspace_path: None, pid: None,
};
store.upsert_session(&session).unwrap();
let engine = Engine::new(Arc::clone(&store));
let mut rx = engine.subscribe();
engine.cleanup_session("s1").await.unwrap();
let evt = rx.recv().await.unwrap();
if let Event::SessionUpdated(s) = evt {
assert!(matches!(s.status, crate::types::SessionStatus::Done));
} else {
panic!("expected SessionUpdated");
}
}
#[tokio::test]
async fn remove_worker_worktree_ignores_non_ninox_paths() {
remove_worker_worktree("/some/random/path", "s1").await;
remove_worker_worktree("/repo/.claude/worktrees/other-id", "s1").await;
remove_worker_worktree("", "s1").await;
}
}