dirge-agent 0.7.4

Minimalistic coding agent written in Rust, optimized for memory footprint and performance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
//! Background-shell registry — the Claude-Code-style model for detached
//! `bash` commands: a command started with `background: true` runs
//! UNBOUNDED (no timeout kill), its output streams into a per-shell
//! buffer the model pulls incrementally via the `bash_output` tool, and
//! it is stopped explicitly via the `kill_shell` tool (or killed en-masse
//! when the session ends).
//!
//! This is distinct from the background-SUBAGENT store
//! (`background.rs`), which uses a push-once completion notification —
//! the wrong fit for long-lived processes (dev servers, watchers) that
//! never "complete" and whose output must be readable while running.
//!
//! Memory model: each entry holds only the UNREAD output. `read_new`
//! returns and clears it, so a model that polls regularly keeps the
//! buffer small; an unread buffer is hard-capped so a never-read flood
//! can't OOM.

#[allow(unused_imports)]
use crate::sync_util::LockExt;
use indexmap::IndexMap;
use rig::completion::ToolDefinition;
use rig::tool::Tool;
use serde::Deserialize;
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;

use crate::agent::tools::ToolError;

/// Process-global background-shell registry. There is exactly one
/// interactive session per process, so the bash tool (which spawns
/// shells), the `bash_output`/`kill_shell` tools, the status bar, and
/// session-end cleanup all share this one instance — avoiding threading
/// it through every builder/UI signature. (Same pattern dirge already
/// uses for the subagent `/kill` abort registry.) Tests inject their own
/// store instead of touching the global, so they stay isolated.
static GLOBAL: std::sync::LazyLock<BackgroundShellStore> =
    std::sync::LazyLock::new(BackgroundShellStore::new);

/// A clone (cheap — `Arc` inside) of the process-global shell registry.
pub fn global() -> BackgroundShellStore {
    GLOBAL.clone()
}

/// Hard cap on a single shell's UNREAD output buffer. Past this the
/// buffer stops growing and a one-time truncation marker is appended;
/// the model should `bash_output` regularly to drain it.
const MAX_UNREAD_BYTES: usize = 1024 * 1024;

/// Max number of shells retained (running + finished-but-unread). Oldest
/// by insertion order is evicted past this — generous for any session.
const STORE_CAPACITY: usize = 32;

/// Max concurrently RUNNING background shells. A runaway model shouldn't
/// be able to spawn unbounded detached processes.
const MAX_CONCURRENT_SHELLS: usize = 8;

#[derive(Debug, Clone, PartialEq)]
pub enum ShellStatus {
    Running,
    Exited(i32),
    /// Killed via `kill_shell` / session end.
    Killed,
    /// Failed to spawn or drain (carries the error).
    Failed(String),
}

impl ShellStatus {
    pub fn is_running(&self) -> bool {
        matches!(self, ShellStatus::Running)
    }
    /// One-word label for the model-facing output / `/tasks` listing.
    pub fn label(&self) -> String {
        match self {
            ShellStatus::Running => "running".to_string(),
            ShellStatus::Exited(code) => format!("exited({code})"),
            ShellStatus::Killed => "killed".to_string(),
            ShellStatus::Failed(e) => format!("failed: {e}"),
        }
    }
}

struct ShellEntry {
    /// The command line, for the `/tasks` listing and tool feedback.
    command: String,
    /// Output produced since the last `read_new` (drained on read).
    unread: String,
    /// True once `unread` hit the cap and we stopped appending.
    truncated: bool,
    status: ShellStatus,
    /// Drain task handle. Aborting it drops the child + its
    /// `PgKillGuard`, SIGKILLing the whole process group. `None` once the
    /// shell has reached a terminal state.
    handle: Option<JoinHandle<()>>,
}

/// One row of the `/tasks` listing / model-facing status.
#[derive(Debug, Clone, PartialEq)]
pub struct ShellInfo {
    pub id: String,
    pub command: String,
    pub status: ShellStatus,
}

/// Thread-safe registry of background shells. Cloneable (`Arc` inside) so
/// the bash tool, the `bash_output`/`kill_shell` tools, and the UI all
/// share one instance.
#[derive(Debug, Clone, Default)]
pub struct BackgroundShellStore {
    inner: Arc<Mutex<IndexMap<String, ShellEntry>>>,
}

// Manual Debug for ShellEntry (JoinHandle isn't Debug-friendly to print).
impl std::fmt::Debug for ShellEntry {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ShellEntry")
            .field("command", &self.command)
            .field("unread_len", &self.unread.len())
            .field("truncated", &self.truncated)
            .field("status", &self.status)
            .field("has_handle", &self.handle.is_some())
            .finish()
    }
}

impl BackgroundShellStore {
    pub fn new() -> Self {
        Self::default()
    }

    /// Compile-time cap on concurrently running shells.
    pub fn max_concurrent() -> usize {
        MAX_CONCURRENT_SHELLS
    }

    fn lock(&self) -> std::sync::MutexGuard<'_, IndexMap<String, ShellEntry>> {
        self.inner.lock_ignore_poison()
    }

    /// Register a freshly-started shell in `Running` state. Evicts the
    /// oldest entry if at capacity — preferring the oldest TERMINAL entry,
    /// and only evicting a still-running one as a last resort (aborting it
    /// first so we never silently detach a live process group; dropping a
    /// `JoinHandle` would otherwise leak it).
    /// Uncapped register — test-only helper for setting up stores past the
    /// concurrent cap. Production code uses [`try_register`], which enforces
    /// `MAX_CONCURRENT_SHELLS` atomically (dirge-jyng).
    #[cfg(test)]
    pub fn register(&self, id: String, command: String) {
        let mut map = self.lock();
        Self::insert_locked(&mut map, id, command);
    }

    /// Register a shell ONLY if fewer than [`MAX_CONCURRENT_SHELLS`] are
    /// currently running — the count and the insert happen under a single
    /// lock so there's no check-then-act race (dirge-jyng: the bash tool
    /// previously did `running_count()` then `register()` as two separate
    /// lock acquisitions, so two concurrent launches could both pass the
    /// cap check). Returns `false` (registering nothing) when the cap is hit.
    pub fn try_register(&self, id: String, command: String) -> bool {
        let mut map = self.lock();
        let running = map.values().filter(|e| e.status.is_running()).count();
        if running >= MAX_CONCURRENT_SHELLS {
            return false;
        }
        Self::insert_locked(&mut map, id, command);
        true
    }

    /// Shared insert + STORE_CAPACITY eviction, performed under a held lock.
    fn insert_locked(map: &mut IndexMap<String, ShellEntry>, id: String, command: String) {
        if !map.contains_key(&id) && map.len() >= STORE_CAPACITY {
            // Prefer evicting the oldest finished shell.
            let victim = map
                .iter()
                .find(|(_, e)| !e.status.is_running())
                .map(|(id, _)| id.clone())
                .or_else(|| map.keys().next().cloned());
            if let Some(victim) = victim
                && let Some(mut e) = map.shift_remove(&victim)
                && let Some(h) = e.handle.take()
            {
                // Last-resort eviction of a running shell: abort its drain
                // task so the process group is SIGKILLed, not orphaned.
                h.abort();
            }
        }
        map.insert(
            id,
            ShellEntry {
                command,
                unread: String::new(),
                truncated: false,
                status: ShellStatus::Running,
                handle: None,
            },
        );
    }

    /// Attach the drain-task handle so `kill`/`kill_all` can abort it.
    pub fn attach_handle(&self, id: &str, handle: JoinHandle<()>) {
        let mut map = self.lock();
        match map.get_mut(id) {
            // Only keep the handle while the shell is still tracked AND
            // running; a race where the drain finished first leaves the
            // terminal status intact and just drops the (already-done)
            // handle.
            Some(e) if e.status.is_running() => e.handle = Some(handle),
            _ => handle.abort(),
        }
    }

    /// Append a chunk of streamed output. No-op for an unknown id.
    /// Bounded: once the unread buffer hits the cap, further output is
    /// dropped with a one-time marker (the model should poll to drain).
    pub fn append(&self, id: &str, chunk: &str) {
        let mut map = self.lock();
        let Some(e) = map.get_mut(id) else {
            return;
        };
        if e.unread.len() + chunk.len() <= MAX_UNREAD_BYTES {
            e.unread.push_str(chunk);
        } else if !e.truncated {
            e.truncated = true;
            let room = MAX_UNREAD_BYTES.saturating_sub(e.unread.len());
            // Push a UTF-8-safe prefix of the chunk up to the cap, then a marker.
            let mut take = room.min(chunk.len());
            while take > 0 && !chunk.is_char_boundary(take) {
                take -= 1;
            }
            e.unread.push_str(&chunk[..take]);
            e.unread.push_str(
                "\n…[background shell output exceeded the unread-buffer cap; call bash_output more often to drain it]",
            );
        }
    }

    /// Record a terminal status and drop the drain handle. No-op if the
    /// id is unknown or already terminal (first terminal wins, so a
    /// `kill` racing a natural exit doesn't clobber the real exit code).
    pub fn finish(&self, id: &str, status: ShellStatus) {
        let mut map = self.lock();
        if let Some(e) = map.get_mut(id)
            && e.status.is_running()
        {
            e.status = status;
            e.handle = None;
        }
    }

    /// Return output produced since the last read (clearing it) plus the
    /// current status. `None` if the id is unknown.
    pub fn read_new(&self, id: &str) -> Option<(String, ShellStatus)> {
        let mut map = self.lock();
        let e = map.get_mut(id)?;
        let out = std::mem::take(&mut e.unread);
        e.truncated = false;
        Some((out, e.status.clone()))
    }

    /// Kill a running shell by id: abort its drain task (which drops the
    /// child and SIGKILLs the process group) and mark it `Killed`.
    /// Returns true if a running shell was found and killed.
    pub fn kill(&self, id: &str) -> bool {
        let mut map = self.lock();
        let Some(e) = map.get_mut(id) else {
            return false;
        };
        if !e.status.is_running() {
            return false;
        }
        if let Some(h) = e.handle.take() {
            h.abort();
        }
        e.status = ShellStatus::Killed;
        true
    }

    /// Number of shells currently running (drives the status bar).
    pub fn running_count(&self) -> usize {
        self.lock()
            .values()
            .filter(|e| e.status.is_running())
            .count()
    }

    /// Snapshot of every tracked shell, newest last.
    pub fn list(&self) -> Vec<ShellInfo> {
        self.lock()
            .iter()
            .map(|(id, e)| ShellInfo {
                id: id.clone(),
                command: e.command.clone(),
                status: e.status.clone(),
            })
            .collect()
    }

    /// Abort every running shell — called on session swap / shutdown so
    /// detached processes don't outlive the session that started them.
    pub fn kill_all(&self) {
        let mut map = self.lock();
        for e in map.values_mut() {
            if e.status.is_running() {
                if let Some(h) = e.handle.take() {
                    h.abort();
                }
                e.status = ShellStatus::Killed;
            }
        }
    }
}

// ── Model-facing tools ──────────────────────────────────────────────

#[derive(Deserialize)]
pub struct BashOutputArgs {
    pub id: String,
}

/// `bash_output` — read output produced by a background shell since the
/// last read, plus its current status. Mirrors Claude Code's BashOutput.
pub struct BashOutputTool {
    store: BackgroundShellStore,
}

impl BashOutputTool {
    pub fn new(store: BackgroundShellStore) -> Self {
        Self { store }
    }
}

impl Tool for BashOutputTool {
    const NAME: &'static str = "bash_output";
    type Error = ToolError;
    type Args = BashOutputArgs;
    type Output = String;

    async fn definition(&self, _prompt: String) -> ToolDefinition {
        ToolDefinition {
            name: "bash_output".to_string(),
            description: "Read new output from a background shell (one started with bash(background=true)). Returns the output produced since your last call plus the shell's status (running / exited(code) / killed / failed). Poll this to follow a long-running command; call kill_shell to stop it.".to_string(),
            parameters: serde_json::json!({
                "type": "object",
                "properties": {
                    "id": { "type": "string", "description": "The background shell id returned by bash(background=true)." }
                },
                "required": ["id"]
            }),
        }
    }

    async fn call(&self, args: BashOutputArgs) -> Result<String, ToolError> {
        match self.store.read_new(&args.id) {
            Some((out, status)) => {
                let body = if out.is_empty() {
                    "(no new output)".to_string()
                } else {
                    out
                };
                Ok(format!(
                    "[shell {}{}]\n{}",
                    args.id,
                    status.label(),
                    body
                ))
            }
            None => Err(ToolError::Msg(format!(
                "no background shell with id {:?} (it may have been evicted)",
                args.id
            ))),
        }
    }
}

#[derive(Deserialize)]
pub struct KillShellArgs {
    pub id: String,
}

/// `kill_shell` — stop a running background shell by id (SIGKILLs its
/// process group). Mirrors Claude Code's KillShell / TaskStop.
pub struct KillShellTool {
    store: BackgroundShellStore,
}

impl KillShellTool {
    pub fn new(store: BackgroundShellStore) -> Self {
        Self { store }
    }
}

impl Tool for KillShellTool {
    const NAME: &'static str = "kill_shell";
    type Error = ToolError;
    type Args = KillShellArgs;
    type Output = String;

    async fn definition(&self, _prompt: String) -> ToolDefinition {
        ToolDefinition {
            name: "kill_shell".to_string(),
            description: "Stop a running background shell (one started with bash(background=true)) by id. Kills the whole process group. No-op if it already exited.".to_string(),
            parameters: serde_json::json!({
                "type": "object",
                "properties": {
                    "id": { "type": "string", "description": "The background shell id to kill." }
                },
                "required": ["id"]
            }),
        }
    }

    async fn call(&self, args: KillShellArgs) -> Result<String, ToolError> {
        if self.store.kill(&args.id) {
            Ok(format!("killed background shell {}", args.id))
        } else {
            Ok(format!(
                "no running background shell with id {:?} (already exited or unknown)",
                args.id
            ))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn read_new_drains_unread_and_reports_status() {
        let s = BackgroundShellStore::new();
        s.register("a".into(), "sleep 1".into());
        s.append("a", "line1\n");
        s.append("a", "line2\n");
        let (out, st) = s.read_new("a").unwrap();
        assert_eq!(out, "line1\nline2\n");
        assert_eq!(st, ShellStatus::Running);
        // Second read sees only new output.
        s.append("a", "line3\n");
        let (out2, _) = s.read_new("a").unwrap();
        assert_eq!(out2, "line3\n");
        // Nothing new.
        let (out3, _) = s.read_new("a").unwrap();
        assert_eq!(out3, "");
    }

    #[test]
    fn read_new_unknown_id_is_none() {
        let s = BackgroundShellStore::new();
        assert!(s.read_new("nope").is_none());
    }

    /// dirge-jyng: try_register enforces MAX_CONCURRENT_SHELLS atomically
    /// (rejects once the running cap is hit) and frees a slot when a shell
    /// finishes.
    #[test]
    fn try_register_enforces_running_cap() {
        let s = BackgroundShellStore::new();
        for n in 0..MAX_CONCURRENT_SHELLS {
            assert!(
                s.try_register(format!("s{n}"), "x".into()),
                "registration {n} within cap must succeed"
            );
        }
        assert_eq!(s.running_count(), MAX_CONCURRENT_SHELLS);
        // One past the cap is rejected and registers nothing.
        assert!(!s.try_register("over".into(), "x".into()));
        assert!(s.read_new("over").is_none());
        // Finishing one frees a running slot → next registration succeeds.
        s.finish("s0", ShellStatus::Exited(0));
        assert!(s.try_register("after".into(), "x".into()));
    }

    #[test]
    fn finish_sets_terminal_and_first_terminal_wins() {
        let s = BackgroundShellStore::new();
        s.register("a".into(), "x".into());
        assert_eq!(s.running_count(), 1);
        s.finish("a", ShellStatus::Exited(0));
        let (_, st) = s.read_new("a").unwrap();
        assert_eq!(st, ShellStatus::Exited(0));
        assert_eq!(s.running_count(), 0);
        // A late kill/finish does not clobber the recorded exit.
        s.finish("a", ShellStatus::Killed);
        assert_eq!(s.read_new("a").unwrap().1, ShellStatus::Exited(0));
    }

    #[test]
    fn kill_marks_killed_only_when_running() {
        let s = BackgroundShellStore::new();
        s.register("a".into(), "x".into());
        assert!(s.kill("a"));
        assert_eq!(s.read_new("a").unwrap().1, ShellStatus::Killed);
        // Already terminal → kill is a no-op.
        assert!(!s.kill("a"));
        assert!(!s.kill("unknown"));
    }

    #[test]
    fn unread_buffer_is_capped() {
        let s = BackgroundShellStore::new();
        s.register("a".into(), "flood".into());
        let chunk = "x".repeat(100_000);
        for _ in 0..20 {
            s.append("a", &chunk);
        }
        let (out, _) = s.read_new("a").unwrap();
        assert!(out.len() <= MAX_UNREAD_BYTES + 200, "len was {}", out.len());
        assert!(out.contains("exceeded the unread-buffer cap"));
    }

    #[tokio::test]
    async fn eviction_aborts_an_evicted_running_shell() {
        use std::sync::Arc;
        use std::sync::atomic::{AtomicBool, Ordering};

        let store = BackgroundShellStore::new();
        let dropped = Arc::new(AtomicBool::new(false));

        // Fill to capacity with running shells; the oldest (s0) carries a
        // task whose Drop flips a flag, so we can prove eviction ABORTS it
        // (drops its future) rather than silently detaching the handle.
        for n in 0..STORE_CAPACITY {
            let id = format!("s{n}");
            store.register(id.clone(), "x".to_string());
            if n == 0 {
                let flag = dropped.clone();
                let h = tokio::spawn(async move {
                    struct G(Arc<AtomicBool>);
                    impl Drop for G {
                        fn drop(&mut self) {
                            self.0.store(true, Ordering::SeqCst);
                        }
                    }
                    let _g = G(flag);
                    std::future::pending::<()>().await;
                });
                store.attach_handle(&id, h);
            }
        }

        // Let s0's task actually start so its drop-guard is constructed
        // (on a current-thread runtime a freshly-spawned task isn't polled
        // until we yield).
        tokio::time::sleep(std::time::Duration::from_millis(20)).await;

        // One more → must evict s0 (all entries running, none terminal).
        store.register("overflow".to_string(), "y".to_string());

        // The evicted task's future should be dropped (aborted) soon.
        for _ in 0..100 {
            if dropped.load(Ordering::SeqCst) {
                break;
            }
            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
        }
        assert!(
            dropped.load(Ordering::SeqCst),
            "evicted running shell's drain task must be aborted, not detached"
        );
        assert!(
            store.list().iter().all(|s| s.id != "s0"),
            "evicted shell must be gone from the registry"
        );
    }

    #[tokio::test]
    async fn eviction_prefers_terminal_over_running() {
        let store = BackgroundShellStore::new();
        // s0 running (oldest), then a terminal one, then fill the rest.
        store.register("s0".to_string(), "x".to_string());
        store.register("term".to_string(), "x".to_string());
        store.finish("term", ShellStatus::Exited(0));
        for n in 1..(STORE_CAPACITY - 1) {
            store.register(format!("s{n}"), "x".to_string());
        }
        // At capacity; overflow should evict the terminal entry, not s0.
        store.register("overflow".to_string(), "x".to_string());
        let ids: Vec<_> = store.list().into_iter().map(|s| s.id).collect();
        assert!(ids.contains(&"s0".to_string()), "running s0 must survive");
        assert!(!ids.contains(&"term".to_string()), "terminal entry evicted");
    }

    #[test]
    fn list_reports_all_shells() {
        let s = BackgroundShellStore::new();
        s.register("a".into(), "cmd-a".into());
        s.register("b".into(), "cmd-b".into());
        s.finish("b", ShellStatus::Exited(1));
        let rows = s.list();
        assert_eq!(rows.len(), 2);
        assert_eq!(rows[0].command, "cmd-a");
        assert_eq!(rows[1].status, ShellStatus::Exited(1));
    }
}