1use crate::{github::GitHubClient, store::Store, types::*};
2use std::{collections::HashMap, sync::Arc};
3use tokio::sync::{broadcast, Mutex};
4
5#[derive(Debug, Clone)]
6pub enum Event {
7 OrchestratorSpawned(Orchestrator),
8 OrchestratorRemoved(OrchestratorId),
9 SessionUpdated(Session),
10 SessionSpawned(Session),
11 SessionDone(SessionId),
12 TerminalOutput { session_id: SessionId, bytes: Vec<u8> },
13 CiUpdated { pr_id: PrId, status: CIStatus },
14 PrOpened { session_id: SessionId, pr: PR },
15 ReviewComment { pr_id: PrId, comment: Comment },
16 Notification(Notification),
17}
18
19pub struct Engine {
20 pub store: Arc<Store>,
21 tx: broadcast::Sender<Event>,
22 pty_writers: Mutex<HashMap<SessionId, tokio::sync::mpsc::UnboundedSender<Vec<u8>>>>,
23 stream_cancel: Mutex<HashMap<SessionId, tokio::sync::oneshot::Sender<()>>>,
26 pub github: Option<GitHubClient>,
28}
29
30impl Engine {
31 pub fn new(store: Arc<Store>) -> Arc<Self> {
32 let (tx, _) = broadcast::channel(256);
33 Arc::new(Self {
34 store,
35 tx,
36 pty_writers: Mutex::new(HashMap::new()),
37 stream_cancel: Mutex::new(HashMap::new()),
38 github: None,
39 })
40 }
41
42 pub fn new_with_github(store: Arc<Store>, token: String) -> Arc<Self> {
43 let (tx, _) = broadcast::channel(256);
44 let github = GitHubClient::new(token).ok();
45 Arc::new(Self {
46 store,
47 tx,
48 pty_writers: Mutex::new(HashMap::new()),
49 stream_cancel: Mutex::new(HashMap::new()),
50 github,
51 })
52 }
53
54 pub async fn register_stream(
58 &self,
59 session_id: SessionId,
60 ) -> tokio::sync::oneshot::Receiver<()> {
61 let mut map = self.stream_cancel.lock().await;
62 if let Some(old_tx) = map.remove(&session_id) {
63 let _ = old_tx.send(());
64 }
65 let (tx, rx) = tokio::sync::oneshot::channel();
66 map.insert(session_id, tx);
67 rx
68 }
69
70 pub fn emit(&self, event: Event) {
71 let _ = self.tx.send(event);
72 }
73
74 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
75 self.tx.subscribe()
76 }
77
78 pub async fn register_pty_writer(
79 &self,
80 session_id: SessionId,
81 writer: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
82 ) {
83 self.pty_writers.lock().await.insert(session_id, writer);
84 }
85
86 pub async fn get_pty_writer(
87 &self,
88 session_id: &str,
89 ) -> Option<tokio::sync::mpsc::UnboundedSender<Vec<u8>>> {
90 self.pty_writers.lock().await.get(session_id).cloned()
91 }
92
93 pub async fn remove_orchestrator(&self, orchestrator_id: &str) -> anyhow::Result<()> {
95 let workers = self.store.sessions_by_orchestrator(orchestrator_id)?;
96 for session in &workers {
97 let _ = crate::tmux::kill_session(&session.id).await;
98 if let Some(ref wp) = session.workspace_path {
99 remove_worker_worktree(wp, &session.id).await;
100 }
101 }
102 let _ = crate::tmux::kill_session(orchestrator_id).await;
104 self.store.delete_orchestrator(orchestrator_id)?;
105 for session in workers {
106 self.emit(Event::SessionDone(session.id));
107 }
108 self.emit(Event::OrchestratorRemoved(orchestrator_id.to_string()));
109 Ok(())
110 }
111
112 pub async fn remove_session(&self, session_id: &str) -> anyhow::Result<()> {
114 let _ = crate::tmux::kill_session(session_id).await;
115 if let Ok(Some(session)) = self.store.get_session(session_id) {
116 if let Some(ref wp) = session.workspace_path {
117 remove_worker_worktree(wp, session_id).await;
118 }
119 }
120 self.store.delete_session(session_id)?;
121 self.emit(Event::SessionDone(session_id.to_string()));
122 Ok(())
123 }
124
125 pub async fn send_to_session(&self, session_id: &str, message: &str) -> anyhow::Result<()> {
130 crate::tmux::send_keys(session_id, message).await
131 }
132
133 pub async fn terminate_session(&self, session_id: &str) -> anyhow::Result<()> {
135 let _ = crate::tmux::kill_session(session_id).await;
137
138 if let Some(mut session) = self.store.get_session(session_id)? {
139 session.status = crate::types::SessionStatus::Terminated;
140 self.store.upsert_session(&session)?;
141 self.emit(Event::SessionUpdated(session));
142 }
143 Ok(())
144 }
145
146 pub async fn cleanup_session(&self, session_id: &str) -> anyhow::Result<()> {
149 let _ = crate::tmux::kill_session(session_id).await;
151
152 if let Some(mut session) = self.store.get_session(session_id)? {
153 session.status = crate::types::SessionStatus::Done;
154 self.store.upsert_session(&session)?;
155 self.emit(Event::SessionUpdated(session));
156 }
157 Ok(())
158 }
159}
160
161async fn remove_worker_worktree(workspace_path: &str, session_id: &str) {
166 let suffix = format!("/.claude/worktrees/{session_id}");
167 if !workspace_path.ends_with(&suffix) {
168 return; }
170 let repo_root = &workspace_path[..workspace_path.len() - suffix.len()];
172 if repo_root.is_empty() {
173 return;
174 }
175 let _ = tokio::process::Command::new("git")
176 .args(["-C", repo_root, "worktree", "remove", "--force", workspace_path])
177 .output()
178 .await;
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::store::Store;
185 use tempfile::tempdir;
186
187 #[tokio::test]
188 async fn emit_received_by_subscriber() {
189 let store = Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
190 let engine = Engine::new(store);
191 let mut rx = engine.subscribe();
192 engine.emit(Event::SessionDone("s1".into()));
193 let event = rx.recv().await.unwrap();
194 assert!(matches!(event, Event::SessionDone(id) if id == "s1"));
195 }
196
197 #[tokio::test]
198 async fn terminate_emits_session_updated() {
199 let store = Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
200 let session = crate::types::Session {
201 id: "s1".into(), orchestrator_id: None, name: "w".into(),
202 repo: "r".into(), status: crate::types::SessionStatus::Working,
203 agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
204 pr_number: None, pr_id: None, workspace_path: None, pid: None,
205 };
206 store.upsert_session(&session).unwrap();
207 let engine = Engine::new(store);
208 let mut rx = engine.subscribe();
209
210 engine.terminate_session("s1").await.unwrap();
211
212 let evt = rx.recv().await.unwrap();
213 if let Event::SessionUpdated(s) = evt {
214 assert!(matches!(s.status, crate::types::SessionStatus::Terminated));
215 } else {
216 panic!("expected SessionUpdated");
217 }
218 }
219
220 #[tokio::test]
221 async fn cleanup_session_sets_done_status() {
222 let store = Arc::new(
223 Store::open(tempdir().unwrap().keep().join("t.db")).unwrap()
224 );
225 let session = crate::types::Session {
226 id: "s1".into(), orchestrator_id: None, name: "w".into(),
227 repo: "r".into(), status: crate::types::SessionStatus::PrOpen,
228 agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
229 pr_number: Some(1), pr_id: Some(1),
230 workspace_path: None, pid: None,
231 };
232 store.upsert_session(&session).unwrap();
233 let engine = Engine::new(Arc::clone(&store));
234 let mut rx = engine.subscribe();
235
236 engine.cleanup_session("s1").await.unwrap();
237
238 let evt = rx.recv().await.unwrap();
239 if let Event::SessionUpdated(s) = evt {
240 assert!(matches!(s.status, crate::types::SessionStatus::Done));
241 } else {
242 panic!("expected SessionUpdated");
243 }
244 }
245
246 #[tokio::test]
247 async fn remove_worker_worktree_ignores_non_ninox_paths() {
248 remove_worker_worktree("/some/random/path", "s1").await;
251 remove_worker_worktree("/repo/.claude/worktrees/other-id", "s1").await;
252 remove_worker_worktree("", "s1").await;
253 }
254}