Skip to main content

ninox_core/
events.rs

1use crate::{github::GitHubClient, store::Store, types::*};
2use std::{collections::HashMap, sync::Arc};
3use tokio::sync::{broadcast, Mutex};
4
5#[derive(Debug, Clone)]
6pub enum Event {
7    OrchestratorSpawned(Orchestrator),
8    OrchestratorRemoved(OrchestratorId),
9    SessionUpdated(Session),
10    SessionSpawned(Session),
11    SessionDone(SessionId),
12    TerminalOutput { session_id: SessionId, bytes: Vec<u8> },
13    CiUpdated      { pr_id: PrId, status: CIStatus },
14    PrOpened       { session_id: SessionId, pr: PR },
15    ReviewComment  { pr_id: PrId, comment: Comment },
16    Notification(Notification),
17}
18
19pub struct Engine {
20    pub store: Arc<Store>,
21    tx: broadcast::Sender<Event>,
22    pty_writers:  Mutex<HashMap<SessionId, tokio::sync::mpsc::UnboundedSender<Vec<u8>>>>,
23    /// Per-session cancellation senders for active FIFO reader tasks.
24    /// Sending () to the stored sender stops the running reader immediately.
25    stream_cancel: Mutex<HashMap<SessionId, tokio::sync::oneshot::Sender<()>>>,
26    /// Optional GitHub API client. None when no token is configured.
27    pub github: Option<GitHubClient>,
28}
29
30impl Engine {
31    pub fn new(store: Arc<Store>) -> Arc<Self> {
32        let (tx, _) = broadcast::channel(256);
33        Arc::new(Self {
34            store,
35            tx,
36            pty_writers:   Mutex::new(HashMap::new()),
37            stream_cancel: Mutex::new(HashMap::new()),
38            github:        None,
39        })
40    }
41
42    pub fn new_with_github(store: Arc<Store>, token: String) -> Arc<Self> {
43        let (tx, _) = broadcast::channel(256);
44        let github = GitHubClient::new(token).ok();
45        Arc::new(Self {
46            store,
47            tx,
48            pty_writers:   Mutex::new(HashMap::new()),
49            stream_cancel: Mutex::new(HashMap::new()),
50            github,
51        })
52    }
53
54    /// Cancel any running FIFO reader for `session_id` and return a fresh
55    /// cancellation receiver for the new reader.  Call this at the top of
56    /// every `start_streaming` invocation.
57    pub async fn register_stream(
58        &self,
59        session_id: SessionId,
60    ) -> tokio::sync::oneshot::Receiver<()> {
61        let mut map = self.stream_cancel.lock().await;
62        if let Some(old_tx) = map.remove(&session_id) {
63            let _ = old_tx.send(());
64        }
65        let (tx, rx) = tokio::sync::oneshot::channel();
66        map.insert(session_id, tx);
67        rx
68    }
69
70    pub fn emit(&self, event: Event) {
71        let _ = self.tx.send(event);
72    }
73
74    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
75        self.tx.subscribe()
76    }
77
78    pub async fn register_pty_writer(
79        &self,
80        session_id: SessionId,
81        writer: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
82    ) {
83        self.pty_writers.lock().await.insert(session_id, writer);
84    }
85
86    pub async fn get_pty_writer(
87        &self,
88        session_id: &str,
89    ) -> Option<tokio::sync::mpsc::UnboundedSender<Vec<u8>>> {
90        self.pty_writers.lock().await.get(session_id).cloned()
91    }
92
93    /// Kill all worker sessions belonging to an orchestrator, delete from DB, emit events.
94    pub async fn remove_orchestrator(&self, orchestrator_id: &str) -> anyhow::Result<()> {
95        let workers = self.store.sessions_by_orchestrator(orchestrator_id)?;
96        for session in &workers {
97            let _ = crate::tmux::kill_session(&session.id).await;
98            if let Some(ref wp) = session.workspace_path {
99                remove_worker_worktree(wp, &session.id).await;
100            }
101        }
102        // Also kill the orchestrator's own tmux session (same id as orchestrator).
103        let _ = crate::tmux::kill_session(orchestrator_id).await;
104        self.store.delete_orchestrator(orchestrator_id)?;
105        for session in workers {
106            self.emit(Event::SessionDone(session.id));
107        }
108        self.emit(Event::OrchestratorRemoved(orchestrator_id.to_string()));
109        Ok(())
110    }
111
112    /// Kill the tmux session and delete it from the DB entirely.
113    pub async fn remove_session(&self, session_id: &str) -> anyhow::Result<()> {
114        let _ = crate::tmux::kill_session(session_id).await;
115        if let Ok(Some(session)) = self.store.get_session(session_id) {
116            if let Some(ref wp) = session.workspace_path {
117                remove_worker_worktree(wp, session_id).await;
118            }
119        }
120        self.store.delete_session(session_id)?;
121        self.emit(Event::SessionDone(session_id.to_string()));
122        Ok(())
123    }
124
125    /// Send a text message to the agent running in a session's tmux window.
126    /// The message is injected as keyboard input — the agent sees it as typed text.
127    /// Returns Ok(()) if the tmux send succeeded; returns an error if the session
128    /// has no active tmux window or tmux is unavailable.
129    pub async fn send_to_session(&self, session_id: &str, message: &str) -> anyhow::Result<()> {
130        crate::tmux::send_keys(session_id, message).await
131    }
132
133    /// Kill the tmux session, mark it Terminated in the DB, and emit SessionUpdated.
134    pub async fn terminate_session(&self, session_id: &str) -> anyhow::Result<()> {
135        // Best-effort tmux kill (session may already be dead).
136        let _ = crate::tmux::kill_session(session_id).await;
137
138        if let Some(mut session) = self.store.get_session(session_id)? {
139            session.status = crate::types::SessionStatus::Terminated;
140            self.store.upsert_session(&session)?;
141            self.emit(Event::SessionUpdated(session));
142        }
143        Ok(())
144    }
145
146    /// Kill the tmux session (best-effort) and mark it Done in the DB.
147    /// Called automatically when a PR is merged. Emits SessionUpdated.
148    pub async fn cleanup_session(&self, session_id: &str) -> anyhow::Result<()> {
149        // Best-effort tmux kill — session may already be dead.
150        let _ = crate::tmux::kill_session(session_id).await;
151
152        if let Some(mut session) = self.store.get_session(session_id)? {
153            session.status = crate::types::SessionStatus::Done;
154            self.store.upsert_session(&session)?;
155            self.emit(Event::SessionUpdated(session));
156        }
157        Ok(())
158    }
159}
160
161/// Remove a Ninox-managed git worktree (best-effort, never propagates errors).
162///
163/// Only acts on paths that match the `.claude/worktrees/{session_id}` pattern
164/// so it never touches unrelated directories.
165async fn remove_worker_worktree(workspace_path: &str, session_id: &str) {
166    let suffix = format!("/.claude/worktrees/{session_id}");
167    if !workspace_path.ends_with(&suffix) {
168        return; // Not a Ninox worktree — leave it alone.
169    }
170    // Derive the repo root by stripping the worktree suffix.
171    let repo_root = &workspace_path[..workspace_path.len() - suffix.len()];
172    if repo_root.is_empty() {
173        return;
174    }
175    let _ = tokio::process::Command::new("git")
176        .args(["-C", repo_root, "worktree", "remove", "--force", workspace_path])
177        .output()
178        .await;
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::store::Store;
185    use tempfile::tempdir;
186
187    #[tokio::test]
188    async fn emit_received_by_subscriber() {
189        let store = Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
190        let engine = Engine::new(store);
191        let mut rx = engine.subscribe();
192        engine.emit(Event::SessionDone("s1".into()));
193        let event = rx.recv().await.unwrap();
194        assert!(matches!(event, Event::SessionDone(id) if id == "s1"));
195    }
196
197    #[tokio::test]
198    async fn terminate_emits_session_updated() {
199        let store = Arc::new(Store::open(tempdir().unwrap().keep().join("t.db")).unwrap());
200        let session = crate::types::Session {
201            id: "s1".into(), orchestrator_id: None, name: "w".into(),
202            repo: "r".into(), status: crate::types::SessionStatus::Working,
203            agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
204            pr_number: None, pr_id: None, workspace_path: None, pid: None,
205        };
206        store.upsert_session(&session).unwrap();
207        let engine = Engine::new(store);
208        let mut rx = engine.subscribe();
209
210        engine.terminate_session("s1").await.unwrap();
211
212        let evt = rx.recv().await.unwrap();
213        if let Event::SessionUpdated(s) = evt {
214            assert!(matches!(s.status, crate::types::SessionStatus::Terminated));
215        } else {
216            panic!("expected SessionUpdated");
217        }
218    }
219
220    #[tokio::test]
221    async fn cleanup_session_sets_done_status() {
222        let store = Arc::new(
223            Store::open(tempdir().unwrap().keep().join("t.db")).unwrap()
224        );
225        let session = crate::types::Session {
226            id: "s1".into(), orchestrator_id: None, name: "w".into(),
227            repo: "r".into(), status: crate::types::SessionStatus::PrOpen,
228            agent_type: "c".into(), cost_usd: 0.0, started_at: 0,
229            pr_number: Some(1), pr_id: Some(1),
230            workspace_path: None, pid: None,
231        };
232        store.upsert_session(&session).unwrap();
233        let engine = Engine::new(Arc::clone(&store));
234        let mut rx = engine.subscribe();
235
236        engine.cleanup_session("s1").await.unwrap();
237
238        let evt = rx.recv().await.unwrap();
239        if let Event::SessionUpdated(s) = evt {
240            assert!(matches!(s.status, crate::types::SessionStatus::Done));
241        } else {
242            panic!("expected SessionUpdated");
243        }
244    }
245
246    #[tokio::test]
247    async fn remove_worker_worktree_ignores_non_ninox_paths() {
248        // Should be a no-op for paths that don't match .claude/worktrees/{id}.
249        // We just verify it doesn't panic or error.
250        remove_worker_worktree("/some/random/path", "s1").await;
251        remove_worker_worktree("/repo/.claude/worktrees/other-id", "s1").await;
252        remove_worker_worktree("", "s1").await;
253    }
254}