harn-dap 0.7.37

Debug Adapter Protocol implementation for Harn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
use std::collections::BTreeMap;
use std::rc::Rc;

use harn_vm::llm::enable_tracing;
use harn_vm::{
    clear_host_call_bridge, register_checkpoint_builtins, register_http_builtins,
    register_llm_builtins, register_metadata_builtins, register_store_builtins, register_vm_stdlib,
    set_host_call_bridge, DebugAction, Vm, VmError,
};
use serde_json::json;

use super::breakpoints::BreakpointAction;
use super::state::{Debugger, ProgramState, StepMode};
use crate::protocol::*;

/// Parse the `kind:<name>:` prefix from a thrown exception's message.
/// Pipelines opt into #111 per-kind filtering by prefixing their
/// `throw` string with e.g. `kind:tool_error:disk is full`. Messages
/// that don't start with `kind:` return None (fall back to the "all"
/// filter behaviour for backward compat with legacy throws).
fn extract_exception_kind(msg: &str) -> Option<String> {
    let tail = msg.strip_prefix("kind:")?;
    let end = tail.find(':')?;
    Some(tail[..end].to_string())
}

impl Debugger {
    pub(crate) fn compile_program(&mut self, source: &str) -> Result<(), String> {
        let mut chunk = harn_vm::compile_source(source)?;
        // Tag the main program's chunk with its source path so
        // `Vm::breakpoint_matches` can match DAP breakpoints keyed by the
        // absolute path the client sent (otherwise `current_source_file()`
        // is `None` for the entry chunk and only wildcard-keyed breakpoints
        // ever fire). Imported modules already get this via
        // `compile_fn_body`; this covers the entry script too.
        if let Some(ref path) = self.source_path {
            chunk.source_file = Some(path.clone());
        }

        let mut vm = Vm::new();
        register_vm_stdlib(&mut vm);
        register_http_builtins(&mut vm);
        register_llm_builtins(&mut vm);

        // Root metadata/store/checkpoint state at the nearest harn.toml
        // (falling back to the source file's directory) so pipelines that
        // call store_get/store_set, metadata_*, or checkpoint_* builtins
        // during a debug session behave the same as under `harn run`.
        let source_parent = self
            .source_path
            .as_ref()
            .and_then(|p| std::path::Path::new(p).parent().map(|p| p.to_path_buf()))
            .unwrap_or_else(|| std::path::PathBuf::from("."));
        let project_root = harn_vm::stdlib::process::find_project_root(&source_parent);
        let store_base = project_root.as_deref().unwrap_or(&source_parent);
        register_store_builtins(&mut vm, store_base);
        register_metadata_builtins(&mut vm, store_base);
        let pipeline_name = self
            .source_path
            .as_ref()
            .and_then(|p| std::path::Path::new(p).file_stem().and_then(|s| s.to_str()))
            .unwrap_or("default");
        register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
        if let Some(ref root) = project_root {
            vm.set_project_root(root);
        }
        // Enable LLM trace collection so the debugger can surface llm_call
        // telemetry to the IDE via DAP `output` events with category=telemetry.
        enable_tracing();

        // Reset any prior bridge from a previous launch, then install the
        // current one (if any) so unhandled host_call ops route to the
        // DAP client via reverse requests instead of erroring out.
        clear_host_call_bridge();
        if let Some(bridge) = &self.host_bridge {
            set_host_call_bridge(Rc::new(bridge.clone()));
        }

        if let Some(ref path) = self.source_path {
            if let Some(parent) = std::path::Path::new(path).parent() {
                if !parent.as_os_str().is_empty() {
                    vm.set_source_dir(parent);
                }
            }
        }

        // Hand the VM each file's breakpoint set keyed by source path so
        // imports don't accidentally match the main script's lines.
        let mut by_file: BTreeMap<String, Vec<usize>> = BTreeMap::new();
        for bp in &self.breakpoints {
            let key = bp
                .source
                .as_ref()
                .and_then(|s| s.path.clone())
                .unwrap_or_default();
            by_file.entry(key).or_default().push(bp.line as usize);
        }
        for (key, lines) in &by_file {
            vm.set_breakpoints_for_file(key, lines.clone());
        }
        // Mirror function breakpoints onto the freshly-built VM so they
        // take effect on the first run without needing a re-edit.
        vm.set_function_breakpoints(
            self.function_breakpoints
                .iter()
                .map(|fb| fb.name.clone())
                .collect(),
        );
        *self.latest_debug_state.borrow_mut() = None;
        let latest_debug_state = Rc::clone(&self.latest_debug_state);
        vm.set_debug_hook(move |state| {
            *latest_debug_state.borrow_mut() = Some(state.clone());
            DebugAction::Continue
        });

        // Push the initial frame but don't run -- the first continue/step drives execution.
        vm.start(&chunk);
        *self.latest_debug_state.borrow_mut() = Some(vm.debug_state());
        self.vm = Some(vm);
        Ok(())
    }

    pub(crate) fn handle_configuration_done(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        let seq = self.next_seq();
        let response = DapResponse::success(seq, msg.seq, "configurationDone", None);
        self.enter_running();
        // Emit a progressStart event so the IDE shows "Running..." while
        // the VM works through its first step batch. We end the progress
        // when the VM stops (next breakpoint, terminates, or pause). DAP
        // spec: progressStart/progressUpdate/progressEnd, identified by
        // a stable progressId we hold for the lifetime of the run.
        let progress_seq = self.next_seq();
        self.active_progress_id = Some(format!("run-{}", progress_seq));
        let progress = DapResponse::event(
            progress_seq,
            "progressStart",
            Some(json!({
                "progressId": self.active_progress_id.clone().unwrap(),
                "title": "Running script",
                "cancellable": false,
            })),
        );
        vec![response, progress]
    }

    pub(crate) fn handle_continue(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        self.step_mode = StepMode::Continue;
        self.stopped = false;
        let seq = self.next_seq();
        let response = DapResponse::success(
            seq,
            msg.seq,
            "continue",
            Some(json!({ "allThreadsContinued": true })),
        );
        self.enter_running();
        vec![response]
    }

    pub(crate) fn handle_next(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        self.step_mode = StepMode::StepOver;
        if let Some(vm) = &mut self.vm {
            vm.set_step_over();
        }
        let seq = self.next_seq();
        let response = DapResponse::success(seq, msg.seq, "next", None);
        self.enter_running();
        vec![response]
    }

    pub(crate) fn handle_step_in(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        self.step_mode = StepMode::StepIn;
        if let Some(vm) = &mut self.vm {
            vm.set_step_mode(true);
        }
        let seq = self.next_seq();
        let response = DapResponse::success(seq, msg.seq, "stepIn", None);
        self.enter_running();
        vec![response]
    }

    /// Handle DAP `restartFrame` — rewind a specific frame back to its
    /// entry point, so stepping resumes from the first instruction of
    /// the function with its original args re-bound. Pairs with
    /// `setVariable` to give pipeline authors an "edit the prompt and
    /// rerun just this function" loop. Side effects already performed
    /// by the restarted frame (tool calls, file writes, LLM round
    /// trips) are *not* rolled back — the IDE surfaces that caveat on
    /// the menu item.
    pub(crate) fn handle_restart_frame(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        let frame_id = msg
            .arguments
            .as_ref()
            .and_then(|a| a.get("frameId"))
            .and_then(|f| f.as_i64())
            .unwrap_or(1);
        // DAP stackTrace IDs are 1-based in our implementation; the
        // VM's restart_frame takes a 0-based index into `frames`,
        // which is emitted in caller-first order. The frame at index 0
        // is the outermost pipeline frame, and IDs count from 1 at
        // the innermost — invert the id back to a vm index.
        let Some(vm) = self.vm.as_mut() else {
            return vec![self.dap_error(msg, "restartFrame", "no active VM session")];
        };
        let frame_count = vm.frame_count();
        if frame_id < 1 || (frame_id as usize) > frame_count {
            return vec![self.dap_error(
                msg,
                "restartFrame",
                &format!("frameId {frame_id} out of range (have {frame_count} frames)"),
            )];
        }
        let vm_index = frame_count - frame_id as usize;
        match vm.restart_frame(vm_index) {
            Ok(()) => {
                self.var_refs.clear();
                self.next_var_ref = 100;
                // restartFrame resumes execution; match the semantics
                // of `continue`. The IDE will request a fresh
                // stackTrace / scopes when the next stopped event
                // fires, so we don't need to pre-emit any state.
                self.step_mode = StepMode::Continue;
                self.stopped = false;
                let seq = self.next_seq();
                let response = DapResponse::success(seq, msg.seq, "restartFrame", None);
                // Emit a `continued` event so the IDE clears its
                // "paused" chrome without waiting for the next stop.
                let evt_seq = self.next_seq();
                let continued = DapResponse::event(
                    evt_seq,
                    "continued",
                    Some(json!({
                        "threadId": self.current_thread_id as i64,
                        "allThreadsContinued": true,
                    })),
                );
                self.enter_running();
                vec![response, continued]
            }
            Err(err) => vec![self.dap_error(msg, "restartFrame", &err.to_string())],
        }
    }

    pub(crate) fn handle_step_out(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        self.step_mode = StepMode::StepOut;
        if let Some(vm) = &mut self.vm {
            vm.set_step_out();
        }
        let seq = self.next_seq();
        let response = DapResponse::success(seq, msg.seq, "stepOut", None);
        self.enter_running();
        vec![response]
    }

    /// Break into the currently running program at the next VM step.
    ///
    /// With message interleaving, this works even mid-run: main.rs drains
    /// the message channel between VM steps, so a pause request lands
    /// while the VM is in flight. We set `pending_pause` so the next
    /// `step_running_vm` call honors it without advancing the VM.
    pub(crate) fn handle_pause(&mut self, msg: &DapMessage) -> Vec<DapResponse> {
        let seq = self.next_seq();
        let mut responses = vec![DapResponse::success(seq, msg.seq, "pause", None)];

        if self.stopped {
            // Already stopped -- just emit a stopped event so the IDE
            // updates its UI immediately.
            let stop_seq = self.next_seq();
            responses.push(DapResponse::event(
                stop_seq,
                "stopped",
                Some(json!({
                    "reason": "pause",
                    "threadId": self.current_thread_id as i64,
                    "allThreadsStopped": true,
                })),
            ));
        } else {
            // Defer: next step_running_vm tick will stop with reason=pause.
            self.pending_pause = true;
            self.step_mode = StepMode::StepIn;
            if let Some(vm) = &mut self.vm {
                vm.set_step_mode(true);
            }
        }

        responses
    }

    /// Honor a deferred pause request: stop on the current instruction
    /// without advancing, emit a stopped event with reason="pause".
    fn handle_pause_stop(&mut self) -> Vec<DapResponse> {
        let mut responses = Vec::new();
        let state = self.current_debug_state();
        self.stopped = true;
        self.running = false;
        self.current_line = state.line as i64;
        self.variables = state.variables;
        self.program_state = ProgramState::Stopped;
        self.flush_output_into(&mut responses);
        self.end_progress(&mut responses);
        let seq = self.next_seq();
        responses.push(DapResponse::event(
            seq,
            "stopped",
            Some(json!({
                "reason": "pause",
                "threadId": self.current_thread_id as i64,
                "allThreadsStopped": true,
            })),
        ));
        responses
    }

    /// Take ONE VM step and return any DAP events the step produced.
    /// Stops the run (sets `running = false`) when the program hits a
    /// breakpoint, terminates, or errors. Designed to be called in a
    /// tight loop by main, interleaved with `request_rx.try_recv()` so
    /// pause/disconnect/setBreakpoints get handled mid-run instead of
    /// being starved by a blocking inner loop.
    pub fn step_running_vm(&mut self) -> Vec<DapResponse> {
        if !self.running || self.vm.is_none() {
            return Vec::new();
        }

        // Honor a pending pause request before taking the step -- we
        // don't actually advance the VM, just stop with reason="pause".
        if self.pending_pause {
            self.pending_pause = false;
            return self.handle_pause_stop();
        }

        let mut responses = Vec::new();
        let step_result = {
            let vm = self.vm.as_mut().unwrap();
            self.runtime.block_on(async { vm.step_execute().await })
        };

        for tele in self.drain_telemetry_events() {
            responses.push(tele);
        }
        self.maybe_progress_update(&mut responses);

        match step_result {
            Ok(Some((_val, stopped))) if stopped => {
                let state = self.current_debug_state();
                let current_line = state.line as i64;
                let vars = state.variables;
                // Check for a function-breakpoint latch first. The VM
                // sets pending_function_bp in push_closure_frame when
                // the callee's name matches, and the step loop surfaces
                // the stop on the next cycle. Drain the latch here so
                // it fires exactly once per entry, and emit a stopped
                // event with reason="function breakpoint".
                let function_bp_name = self
                    .vm
                    .as_mut()
                    .and_then(|vm| vm.take_pending_function_bp());
                if let Some(fn_name) = function_bp_name {
                    // Honor condition / hitCondition the same way
                    // classify_breakpoint_hit does for line BPs.
                    let fb_idx = self
                        .function_breakpoints
                        .iter()
                        .position(|fb| fb.name == fn_name);
                    let mut should_stop = true;
                    if let Some(idx) = fb_idx {
                        let fb = self.function_breakpoints[idx].clone();
                        let counter = self.bp_hit_counts.entry(fb.id).or_insert(0);
                        *counter += 1;
                        let hits = *counter;
                        if let Some(ref hc) = fb.hit_condition {
                            if super::breakpoints::hit_condition_matches(hc, hits) == Some(false) {
                                should_stop = false;
                            }
                        }
                        if should_stop {
                            if let Some(ref cond) = fb.condition {
                                if let Some(vm) = self.vm.as_mut() {
                                    let truthy = self
                                        .runtime
                                        .block_on(async {
                                            let local = tokio::task::LocalSet::new();
                                            local.run_until(vm.evaluate_in_frame(cond, 0)).await
                                        })
                                        .map(|v| v.is_truthy())
                                        .unwrap_or(true);
                                    if !truthy {
                                        should_stop = false;
                                    }
                                }
                            }
                        }
                    }
                    if !should_stop {
                        return responses;
                    }
                    self.step_mode = StepMode::Continue;
                    self.stopped = true;
                    self.running = false;
                    self.current_line = current_line;
                    self.variables = vars;
                    self.program_state = ProgramState::Stopped;
                    self.flush_output_into(&mut responses);
                    self.end_progress(&mut responses);
                    let seq = self.next_seq();
                    responses.push(DapResponse::event(
                        seq,
                        "stopped",
                        Some(json!({
                            "reason": "function breakpoint",
                            "description": fn_name,
                            "threadId": self.current_thread_id as i64,
                            "allThreadsStopped": true,
                        })),
                    ));
                    return responses;
                }
                // Defer the real stop decision to classify_breakpoint_hit,
                // which evaluates hit-count → condition → logpoint in
                // the order VS Code uses. Steps and exceptions bypass
                // this path entirely because the VM only reports
                // stopped=true on real breakpoint hits.
                let bp_touches_line = self.breakpoints.iter().any(|bp| bp.line == current_line);
                if bp_touches_line {
                    match self.classify_breakpoint_hit(current_line, &vars) {
                        BreakpointAction::Stop => {}
                        BreakpointAction::Skip => return responses,
                        BreakpointAction::LogAndContinue(rendered) => {
                            let seq = self.next_seq();
                            responses.push(DapResponse::event(
                                seq,
                                "output",
                                Some(json!({
                                    "category": "console",
                                    "output": format!("{rendered}\n"),
                                })),
                            ));
                            return responses;
                        }
                        BreakpointAction::Diagnostic(msg) => {
                            let seq = self.next_seq();
                            responses.push(DapResponse::event(
                                seq,
                                "output",
                                Some(json!({
                                    "category": "console",
                                    "output": format!("{msg}\n"),
                                })),
                            ));
                            return responses;
                        }
                    }
                }
                // Pick a DAP stop reason that matches what actually stopped
                // execution. Step requests complete by landing the VM on
                // the next line while no breakpoint matches; conflating
                // those with real breakpoint hits (all labeled "breakpoint")
                // made the IDE status bar show "Paused on breakpoint" even
                // after a step-out far past the last breakpoint. When a
                // step is in flight and the current line isn't a registered
                // breakpoint, emit reason="step" instead.
                let step_in_flight = self.step_mode != StepMode::Continue;
                let line_is_bp = self.breakpoints.iter().any(|bp| bp.line == current_line);
                let reason = if step_in_flight && !line_is_bp {
                    "step"
                } else {
                    "breakpoint"
                };
                // Stepping always completes at the next stop — reset so
                // the following continue/breakpoint is classified cleanly.
                self.step_mode = StepMode::Continue;
                self.stopped = true;
                self.running = false;
                self.current_line = current_line;
                self.variables = vars;
                self.program_state = ProgramState::Stopped;
                self.flush_output_into(&mut responses);
                self.end_progress(&mut responses);
                let seq = self.next_seq();
                responses.push(DapResponse::event(
                    seq,
                    "stopped",
                    Some(json!({
                        "reason": reason,
                        "threadId": self.current_thread_id as i64,
                        "allThreadsStopped": true,
                    })),
                ));
            }
            Ok(Some((_val, _))) => {
                // Program reached its natural end.
                self.flush_output_into(&mut responses);
                self.program_state = ProgramState::Terminated;
                self.running = false;
                self.end_progress(&mut responses);
                let seq = self.next_seq();
                responses.push(DapResponse::event(seq, "terminated", None));
            }
            Ok(None) => {
                // Mid-instruction continuation; just keep stepping.
            }
            Err(e) => {
                self.running = false;
                self.end_progress(&mut responses);
                if self.break_on_exceptions && matches!(&e, VmError::Thrown(_)) {
                    // #111 per-kind filter gate: a VmError::Thrown
                    // whose message starts with "kind:<name>:" lets
                    // the exception filter narrow to a specific
                    // AgentEvent kind. When the active filter set
                    // doesn't select this kind (and the generic
                    // "all" filter isn't active), skip the stop and
                    // let the error propagate naturally. Legacy
                    // throws with no kind tag stop under "all" only.
                    let error_msg = e.to_string();
                    let kind = extract_exception_kind(&error_msg);
                    let selected = match kind.as_deref() {
                        Some(k) if self.exception_filters.contains_key(k) => {
                            self.exception_filter_matches(k)
                        }
                        _ => self.exception_filters.contains_key("all"),
                    };
                    if !selected {
                        let seq = self.next_seq();
                        responses.push(DapResponse::event(
                            seq,
                            "output",
                            Some(json!({
                                "category": "stderr",
                                "output": format!("Error: {e}\n"),
                            })),
                        ));
                        self.program_state = ProgramState::Terminated;
                        let seq = self.next_seq();
                        responses.push(DapResponse::event(seq, "terminated", None));
                        return responses;
                    }
                    let state = self.current_debug_state();
                    self.stopped = true;
                    self.current_line = state.line as i64;
                    self.variables = state.variables;
                    self.program_state = ProgramState::Stopped;
                    let seq = self.next_seq();
                    responses.push(DapResponse::event(
                        seq,
                        "output",
                        Some(json!({
                            "category": "stderr",
                            "output": format!("Exception: {error_msg}\n"),
                        })),
                    ));
                    let seq = self.next_seq();
                    responses.push(DapResponse::event(
                        seq,
                        "stopped",
                        Some(json!({
                            "reason": "exception",
                            "description": error_msg,
                            "hitBreakpointIds": kind
                                .as_deref()
                                .map(|k| vec![k.to_string()])
                                .unwrap_or_default(),
                            "threadId": self.current_thread_id as i64,
                            "allThreadsStopped": true,
                        })),
                    ));
                    return responses;
                }
                let seq = self.next_seq();
                responses.push(DapResponse::event(
                    seq,
                    "output",
                    Some(json!({
                        "category": "stderr",
                        "output": format!("Error: {e}\n"),
                    })),
                ));
                self.program_state = ProgramState::Terminated;
                let seq = self.next_seq();
                responses.push(DapResponse::event(seq, "terminated", None));
            }
        }

        responses
    }
}