dirge-agent 0.12.3

Minimalistic coding agent written in Rust, optimized for memory footprint and performance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
//! High-level LSP client.
//!
//! Layered on [`crate::lsp::rpc`]. Tracks per-file synchronization state
//! (versions, last text sent) and accumulates push/pull diagnostics into
//! merged, deduplicated views.
//!
//! What this module DOES NOT do:
//! - Spawn LSP server processes (that's Phase 4's orchestrator).
//! - Pick which server to attach to a file (Phase 4).
//! - Cancellation on shutdown (covered when the orchestrator owns clients).
//!
//! Construction: call [`LspClient::new`] with an already-initialized
//! [`RpcClient`]. The constructor installs the `textDocument/publishDiagnostics`
//! handler so push diagnostics are accumulated from that moment on.

#[allow(unused_imports)]
use crate::sync_util::LockExt;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use lsp_types::Diagnostic;
use serde_json::{Value, json};
use tokio::sync::watch;

use crate::lsp::language;
use crate::lsp::rpc::{RpcClient, RpcError};
use crate::lsp::uri::{path_to_file_uri_string, uri_to_path};

#[derive(Debug, Default)]
struct FileState {
    /// Last version we sent to the server. LSP wants a monotonically
    /// increasing per-document version on each didChange.
    version: i32,
}

#[derive(Debug, Default)]
struct Inner {
    files: HashMap<PathBuf, FileState>,
    push: HashMap<PathBuf, Vec<Diagnostic>>,
    pull: HashMap<PathBuf, Vec<Diagnostic>>,
    last_push_at: HashMap<PathBuf, Instant>,
    /// EXT-3: per-path async mutex so concurrent `notify_open` for the
    /// same file serialize the read+version-bump+notify sequence.
    /// Without this, two interleaved opens could ship version 0
    /// content with v1's actual bytes (or vice versa) because the
    /// file read happens outside the version-bump critical section.
    /// Each entry is an `Arc<tokio::sync::Mutex<()>>` so the per-path
    /// guard can outlive a brief grab of the surrounding sync mutex.
    per_path_locks: HashMap<PathBuf, Arc<tokio::sync::Mutex<()>>>,
}

/// High-level client for a connected LSP server. Cheap to clone.
#[derive(Clone)]
pub struct LspClient {
    rpc: RpcClient,
    inner: Arc<Mutex<Inner>>,
    /// Watch channel bumped on every incoming push. Waiters subscribe and
    /// poll the inner state to detect fresh data for their file of interest.
    push_signal: watch::Sender<u64>,
}

#[derive(Debug, thiserror::Error)]
pub enum LspError {
    #[error(transparent)]
    Rpc(#[from] RpcError),
    #[error("I/O error: {0}")]
    Io(#[from] std::io::Error),
    #[error("timed out waiting for diagnostics on {path}")]
    DiagnosticsTimeout { path: PathBuf },
    #[error("server closed before diagnostics arrived")]
    ServerClosed,
}

impl LspClient {
    /// Wrap an initialized [`RpcClient`] in a higher-level LSP client. The
    /// constructor registers a `textDocument/publishDiagnostics` handler that
    /// records pushed diagnostics into the client's state.
    pub async fn new(rpc: RpcClient) -> Self {
        let (signal_tx, _signal_rx) = watch::channel(0u64);
        let inner = Arc::new(Mutex::new(Inner::default()));

        // Register the push handler. Held by the RpcClient internally; it
        // captures clones of `inner` and `signal_tx`.
        let inner_for_handler = Arc::clone(&inner);
        let signal_for_handler = signal_tx.clone();
        rpc.on_notification(
            "textDocument/publishDiagnostics",
            Box::new(move |params: Value| {
                let Some(path) = params
                    .get("uri")
                    .and_then(|v| v.as_str())
                    .and_then(uri_to_path)
                else {
                    return;
                };
                let diagnostics: Vec<Diagnostic> = params
                    .get("diagnostics")
                    .and_then(|v| serde_json::from_value(v.clone()).ok())
                    .unwrap_or_default();

                // EXT-4: check diagnostic version. Servers send
                // `version` in publishDiagnostics to indicate which
                // document version the diagnostics apply to. If the
                // server sent diagnostics for an older version,
                // ignore them — stale "clean" diags would hide
                // real errors from the current version.
                let server_version: Option<i32> = params
                    .get("version")
                    .and_then(|v| v.as_i64())
                    .map(|v| v as i32);

                {
                    let mut state = inner_for_handler.lock_ignore_poison();
                    // Drop if server sent a version older than what
                    // we last opened/modified.
                    let should_drop = if let Some(sv) = server_version {
                        state
                            .files
                            .get(&path)
                            .map(|f| sv < f.version)
                            .unwrap_or(false)
                    } else {
                        false
                    };
                    if should_drop {
                        return;
                    }
                    state.push.insert(path.clone(), diagnostics);
                    state.last_push_at.insert(path, Instant::now());
                }
                // Bump the watch to wake any waiters.
                signal_for_handler.send_modify(|v| *v = v.wrapping_add(1));
            }),
        )
        .await;

        Self {
            rpc,
            inner,
            push_signal: signal_tx,
        }
    }

    /// Open or update a file with the server. Reads the current file
    /// contents, sends `textDocument/didOpen` on first contact or
    /// `textDocument/didChange` thereafter (with a bumped version).
    /// Returns the version sent.
    ///
    /// File I/O goes through `tokio::fs` so the orchestrator can fan
    /// `touch_file` out across multiple clients without blocking the runtime
    /// thread.
    pub async fn notify_open(&self, path: &Path) -> Result<i32, LspError> {
        let abs = match tokio::fs::canonicalize(path).await {
            Ok(p) => p,
            Err(_) => path.to_path_buf(),
        };

        // EXT-3: serialize concurrent opens of the same path. Grab the
        // per-path async mutex (creating one if first contact) under
        // the sync mutex briefly, then hold the async mutex across the
        // entire read → version bump → rpc.notify sequence. Without
        // this, two parallel `notify_open` calls can interleave:
        //
        //   A: read v0 content
        //                     B: read v1 content   (file changed)
        //   A: bump → version 0
        //                     B: bump → version 1
        //   A: notify(version=0, text=<B's v1 bytes>)
        //                     B: notify(version=1, text=<B's v1 bytes>)
        //
        // The server then sees version=0 paired with v1 content, which
        // either confuses incremental sync (rust-analyzer rejects the
        // mismatch) or, worse, silently de-syncs the in-server text.
        let path_lock = {
            let mut state = self.inner.lock_ignore_poison();
            Arc::clone(
                state
                    .per_path_locks
                    .entry(abs.clone())
                    .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
            )
        };
        let _guard = path_lock.lock().await;

        let text = tokio::fs::read_to_string(&abs).await?;
        let uri = path_to_file_uri_string(&abs);

        let is_first_open;
        let version;
        {
            let mut state = self.inner.lock_ignore_poison();
            is_first_open = !state.files.contains_key(&abs);
            let entry = state.files.entry(abs.clone()).or_default();
            if is_first_open {
                entry.version = 0;
                version = 0;
            } else {
                entry.version += 1;
                version = entry.version;
            }
        }

        if is_first_open {
            let language_id = language::language_for_path(&abs);
            self.rpc
                .notify(
                    "textDocument/didOpen",
                    json!({
                        "textDocument": {
                            "uri": uri,
                            "languageId": language_id,
                            "version": version,
                            "text": text,
                        }
                    }),
                )
                .await?;
        } else {
            self.rpc
                .notify(
                    "textDocument/didChange",
                    json!({
                        "textDocument": {
                            "uri": uri,
                            "version": version,
                        },
                        // Always send full-document changes for simplicity.
                        // Per the LSP spec, full-sync (sync=1) servers expect
                        // this shape; incremental-sync (sync=2) servers also
                        // accept it. We may switch on initialize.sync later.
                        "contentChanges": [{ "text": text }],
                    }),
                )
                .await?;
        }
        Ok(version)
    }

    /// Send `textDocument/didClose` for one file and drop the
    /// associated server-side state on our end (version, push +
    /// pull diagnostics, last-push timestamp). Per the LSP spec
    /// the server is free to discard its parse-tree / cached
    /// analyses once it receives this notification — without it
    /// servers retain everything we've ever opened, growing
    /// unbounded over a long session.
    ///
    /// Returns Ok even if the file wasn't open; sending a stray
    /// didClose is harmless per the spec.
    pub async fn notify_close(&self, path: &Path) -> Result<(), LspError> {
        let abs = match tokio::fs::canonicalize(path).await {
            Ok(p) => p,
            Err(_) => path.to_path_buf(),
        };
        let was_open;
        {
            let mut state = self.inner.lock_ignore_poison();
            was_open = state.files.remove(&abs).is_some();
            state.push.remove(&abs);
            state.pull.remove(&abs);
            state.last_push_at.remove(&abs);
            // EXT-3: drop the per-path serialization mutex too. Any
            // in-flight `notify_open` for this path that already
            // holds an Arc clone keeps it alive until they complete.
            state.per_path_locks.remove(&abs);
        }
        if was_open {
            let uri = path_to_file_uri_string(&abs);
            self.rpc
                .notify(
                    "textDocument/didClose",
                    json!({ "textDocument": { "uri": uri } }),
                )
                .await?;
        }
        Ok(())
    }

    /// Close every file currently tracked by this client. Used at
    /// session shutdown to release server-side per-file state
    /// before the process exits. Failures are swallowed: shutdown
    /// is best-effort.
    pub async fn close_all(&self) {
        let paths: Vec<PathBuf> = {
            let state = self.inner.lock_ignore_poison();
            state.files.keys().cloned().collect()
        };
        for p in paths {
            let _ = self.notify_close(&p).await;
        }
    }

    /// Merged + deduplicated diagnostics for a single file. Combines push
    /// (server-volunteered) and pull (server requested explicitly) state.
    pub fn diagnostics_for(&self, path: &Path) -> Vec<Diagnostic> {
        let state = self.inner.lock_ignore_poison();
        let push = state.push.get(path).cloned().unwrap_or_default();
        let pull = state.pull.get(path).cloned().unwrap_or_default();
        dedupe(push.into_iter().chain(pull))
    }

    /// All merged + deduplicated diagnostics across every tracked file.
    /// Empty entries are pruned. Useful for the write/edit tool's
    /// project-wide diagnostic block.
    pub fn all_diagnostics(&self) -> HashMap<PathBuf, Vec<Diagnostic>> {
        let state = self.inner.lock_ignore_poison();
        let mut paths: HashSet<PathBuf> = state.push.keys().cloned().collect();
        paths.extend(state.pull.keys().cloned());
        drop(state);

        let mut result = HashMap::new();
        for p in paths {
            let merged = self.diagnostics_for(&p);
            if !merged.is_empty() {
                result.insert(p, merged);
            }
        }
        result
    }

    /// Block until a push for `path` arrives with a timestamp strictly later
    /// than `after`, or `timeout` elapses. Stale pushes (already in state at
    /// call time) do NOT satisfy the wait — only a fresh arrival counts.
    pub async fn wait_for_push(
        &self,
        path: &Path,
        after: Instant,
        timeout: Duration,
    ) -> Result<(), LspError> {
        let deadline = Instant::now() + timeout;
        let mut rx = self.push_signal.subscribe();
        loop {
            if self.has_fresh_push(path, after) {
                return Ok(());
            }
            let remaining = deadline.saturating_duration_since(Instant::now());
            if remaining.is_zero() {
                return Err(LspError::DiagnosticsTimeout {
                    path: path.to_path_buf(),
                });
            }
            match tokio::time::timeout(remaining, rx.changed()).await {
                Ok(Ok(())) => continue,
                Ok(Err(_)) => return Err(LspError::ServerClosed),
                Err(_) => {
                    return Err(LspError::DiagnosticsTimeout {
                        path: path.to_path_buf(),
                    });
                }
            }
        }
    }

    /// B3-10 (audit fix): race the push-wait against a pull request to
    /// `textDocument/diagnostic` (LSP 3.17). For servers that don't
    /// push diagnostics on demand (clojure-lsp's lazy semantic check,
    /// jdtls during cold-start, clangd before background indexing),
    /// the push-only wait times out and the agent reports a clean
    /// diagnostic block when errors actually exist. Pull diagnostics
    /// give the agent an authoritative answer.
    ///
    /// On pull success the response's diagnostics are injected into
    /// the same state.push that the notification handler uses, so
    /// `diagnostics_for(path)` afterwards returns them transparently.
    /// On pull failure (method not found, server doesn't support
    /// 3.17, transport error) we fall back to the push-only wait —
    /// no regression versus the prior behaviour. Opencode races the
    /// same way (lsp/client.ts:540-582 `waitForDocumentDiagnostics`).
    pub async fn wait_for_push_or_pull(
        &self,
        path: &Path,
        after: Instant,
        timeout: Duration,
    ) -> Result<(), LspError> {
        // Sequential: try the pull first with a short bound; if
        // unsupported / transport error, fall back to push-only
        // for the remaining time. Net behaviour for push-only
        // servers is identical to the prior wait_for_push call.
        // Pull-supporting servers (LSP 3.17) return an answer
        // synchronously and we skip the push wait entirely.
        let pull_started = Instant::now();
        let pull_timeout = timeout.min(Duration::from_secs(3));
        if let Ok(true) = self.try_pull_diagnostics(path, pull_timeout).await {
            return Ok(());
        }
        // Pull either failed (method not found, transport error)
        // or returned no useful info. Wait on push for whatever
        // budget remains.
        let elapsed = pull_started.elapsed();
        let push_budget = timeout.saturating_sub(elapsed);
        // If pull burned the full timeout, still give push at
        // least a brief window (100ms) — better than instant-fail.
        let push_budget = push_budget.max(Duration::from_millis(100));
        self.wait_for_push(path, after, push_budget).await
    }

    /// Send a `textDocument/diagnostic` (LSP 3.17) pull request and
    /// inject any returned items into the client's push state so
    /// downstream callers see them via `diagnostics_for`. Returns
    /// `Ok(true)` if the server responded with diagnostics (full or
    /// unchanged), `Ok(false)` if the server responded with no items
    /// or doesn't support pull. `Err` only for transport-level
    /// failures the caller should surface.
    async fn try_pull_diagnostics(&self, path: &Path, timeout: Duration) -> Result<bool, LspError> {
        use serde_json::json;
        let uri = crate::lsp::uri::path_to_file_uri_string(path);
        let params = json!({
            "textDocument": { "uri": uri },
        });
        // The reply shape can be one of several Document Diagnostic
        // Report kinds; just deserialize to Value and pick `items`.
        let resp: Result<serde_json::Value, _> = self
            .rpc
            .request("textDocument/diagnostic", params, timeout)
            .await;
        let value = match resp {
            Ok(v) => v,
            Err(_) => return Ok(false), // unsupported / errored — let push handle it
        };
        // Full report: { kind: "full", items: [...] }.
        // Unchanged report: { kind: "unchanged", resultId: ... } —
        // no new diagnostics, but successful response means "no
        // errors right now" → return true so the caller doesn't
        // block on push.
        let kind = value.get("kind").and_then(|k| k.as_str()).unwrap_or("");
        if kind == "unchanged" {
            return Ok(true);
        }
        let items: Vec<lsp_types::Diagnostic> = value
            .get("items")
            .and_then(|i| serde_json::from_value(i.clone()).ok())
            .unwrap_or_default();
        let abs = path.to_path_buf();
        {
            let mut state = self.inner.lock_ignore_poison();
            state.push.insert(abs.clone(), items);
            state.last_push_at.insert(abs, Instant::now());
        }
        // Wake any concurrent wait_for_push waiters since we just
        // populated state — they'll detect a fresh push.
        self.push_signal.send_modify(|v| *v = v.wrapping_add(1));
        Ok(true)
    }

    fn has_fresh_push(&self, path: &Path, after: Instant) -> bool {
        let state = self.inner.lock_ignore_poison();
        state
            .last_push_at
            .get(path)
            .map(|t| *t > after)
            .unwrap_or(false)
    }

    /// For Phase 4: handles to internals so the orchestrator can fan out
    /// requests directly to `rpc`.
    pub fn rpc(&self) -> &RpcClient {
        &self.rpc
    }
}

fn dedupe<I: IntoIterator<Item = Diagnostic>>(items: I) -> Vec<Diagnostic> {
    let mut seen: HashSet<String> = HashSet::new();
    let mut out = Vec::new();
    for d in items {
        // Matches opencode's dedupe key: range + severity + code + source +
        // message. Including `code` is load-bearing — two clippy lints on
        // the same line with the same message but different codes (e.g.
        // `needless_clone` vs `redundant_clone`) must not collapse.
        let key = match serde_json::to_string(&(
            &d.range,
            d.severity,
            &d.code,
            d.source.as_deref(),
            &d.message,
        )) {
            Ok(s) => s,
            Err(_) => continue,
        };
        if seen.insert(key) {
            out.push(d);
        }
    }
    out
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::jsonrpc_framing::{decode_frame, encode_frame};
    use lsp_types::{DiagnosticSeverity, NumberOrString, Position, Range};
    use serde_json::json;
    use tokio::io::BufReader;
    use tokio::sync::mpsc;

    /// Build an LspClient wired to an in-memory peer. Returns the client, a
    /// JoinHandle for the reader, and the server-side channels:
    /// - `from_client`: requests/notifications the client sent to the server
    /// - `to_client_tx`: send raw JSON frames toward the client
    async fn pair() -> (
        LspClient,
        mpsc::UnboundedReceiver<Value>,
        mpsc::UnboundedSender<Value>,
    ) {
        let (client_in, server_out) = tokio::io::duplex(8192);
        let (server_in, client_out) = tokio::io::duplex(8192);
        let (client_reader, _) = tokio::io::split(client_in);
        let (_, client_writer) = tokio::io::split(client_out);
        let (server_reader, _) = tokio::io::split(server_in);
        let (_, mut server_writer) = tokio::io::split(server_out);
        let (rpc, _task) = RpcClient::new(BufReader::new(client_reader), client_writer);
        let client = LspClient::new(rpc).await;

        let (from_client_tx, from_client) = mpsc::unbounded_channel();
        tokio::spawn(async move {
            let mut reader = BufReader::new(server_reader);
            loop {
                let frame = match decode_frame(&mut reader).await {
                    Ok(b) => b,
                    Err(_) => break,
                };
                let v: Value = serde_json::from_slice(&frame).unwrap();
                if from_client_tx.send(v).is_err() {
                    break;
                }
            }
        });

        let (to_client_tx, mut to_client_rx) = mpsc::unbounded_channel::<Value>();
        tokio::spawn(async move {
            while let Some(msg) = to_client_rx.recv().await {
                let bytes = serde_json::to_vec(&msg).unwrap();
                if encode_frame(&mut server_writer, &bytes).await.is_err() {
                    break;
                }
            }
        });

        (client, from_client, to_client_tx)
    }

    /// Creates a tempfile named `dirge-lsp-client-test-<pid>-<nanos>-<suffix>.rs`
    /// so language detection picks up the right languageId.
    fn tempfile_with(suffix: &str, contents: &str) -> PathBuf {
        let p = std::env::temp_dir().join(format!(
            "dirge-lsp-client-test-{}-{}-{suffix}.rs",
            std::process::id(),
            crate::time_util::now_unix_nanos(),
        ));
        std::fs::write(&p, contents).unwrap();
        p
    }

    fn diag(line: u32, msg: &str) -> Diagnostic {
        Diagnostic {
            range: Range {
                start: Position { line, character: 0 },
                end: Position { line, character: 0 },
            },
            severity: Some(DiagnosticSeverity::ERROR),
            code: Some(NumberOrString::String("E0001".to_string())),
            code_description: None,
            source: Some("rustc".to_string()),
            message: msg.to_string(),
            related_information: None,
            tags: None,
            data: None,
        }
    }

    #[tokio::test]
    async fn notify_open_first_call_sends_did_open_with_version_zero() {
        let (client, mut from_client, _to) = pair().await;
        let path = tempfile_with("first-open", "fn main() {}\n");
        let v = client.notify_open(&path).await.unwrap();
        assert_eq!(v, 0);

        let frame = from_client.recv().await.unwrap();
        assert_eq!(frame["method"], "textDocument/didOpen");
        assert_eq!(frame["params"]["textDocument"]["version"], 0);
        assert_eq!(frame["params"]["textDocument"]["languageId"], "rust");
        assert_eq!(frame["params"]["textDocument"]["text"], "fn main() {}\n");
        std::fs::remove_file(&path).ok();
    }

    // Regression: subsequent notify_open calls must emit didChange (NOT
    // another didOpen) with a bumped version. Sending didOpen twice for the
    // same uri causes rust-analyzer to error out.
    #[tokio::test]
    async fn regression_subsequent_notify_open_sends_did_change_with_bumped_version() {
        let (client, mut from_client, _to) = pair().await;
        let path = tempfile_with("subsequent-open", "fn main() {}\n");
        let v0 = client.notify_open(&path).await.unwrap();
        let _first = from_client.recv().await.unwrap();

        // Mutate the file so the change is meaningful.
        std::fs::write(&path, "fn main() { let _ = 1; }\n").unwrap();
        let v1 = client.notify_open(&path).await.unwrap();
        assert_eq!(v0, 0);
        assert_eq!(v1, 1);

        let second = from_client.recv().await.unwrap();
        assert_eq!(second["method"], "textDocument/didChange");
        assert_eq!(second["params"]["textDocument"]["version"], 1);
        assert_eq!(
            second["params"]["contentChanges"][0]["text"],
            "fn main() { let _ = 1; }\n"
        );
        std::fs::remove_file(&path).ok();
    }

    #[tokio::test]
    async fn notify_open_reads_current_file_contents() {
        let (client, mut from_client, _to) = pair().await;
        let path = tempfile_with("read-contents", "hello world\n");
        client.notify_open(&path).await.unwrap();
        let frame = from_client.recv().await.unwrap();
        assert_eq!(frame["params"]["textDocument"]["text"], "hello world\n");
        std::fs::remove_file(&path).ok();
    }

    /// Regression: `notify_close` emits `textDocument/didClose` and
    /// removes the per-file state so a subsequent `notify_open`
    /// starts a fresh didOpen (version 0) rather than a didChange.
    #[tokio::test]
    async fn notify_close_emits_did_close_and_clears_state() {
        let (client, mut from_client, _to) = pair().await;
        let path = tempfile_with("close-flow", "fn main() {}\n");
        client.notify_open(&path).await.unwrap();
        let _open_frame = from_client.recv().await.unwrap();

        client.notify_close(&path).await.unwrap();
        let close_frame = from_client.recv().await.unwrap();
        assert_eq!(close_frame["method"], "textDocument/didClose");
        // Re-open after close must send a fresh didOpen (not didChange)
        // because the file state was dropped.
        client.notify_open(&path).await.unwrap();
        let reopen_frame = from_client.recv().await.unwrap();
        assert_eq!(reopen_frame["method"], "textDocument/didOpen");
        assert_eq!(reopen_frame["params"]["textDocument"]["version"], 0);
        std::fs::remove_file(&path).ok();
    }

    /// `close_all` sends didClose for every tracked file.
    #[tokio::test]
    async fn close_all_emits_did_close_for_every_open_file() {
        let (client, mut from_client, _to) = pair().await;
        let p1 = tempfile_with("close-all-a", "a\n");
        let p2 = tempfile_with("close-all-b", "b\n");
        client.notify_open(&p1).await.unwrap();
        client.notify_open(&p2).await.unwrap();
        // Drain the didOpens.
        let _ = from_client.recv().await.unwrap();
        let _ = from_client.recv().await.unwrap();

        client.close_all().await;
        let f1 = from_client.recv().await.unwrap();
        let f2 = from_client.recv().await.unwrap();
        assert_eq!(f1["method"], "textDocument/didClose");
        assert_eq!(f2["method"], "textDocument/didClose");
        std::fs::remove_file(&p1).ok();
        std::fs::remove_file(&p2).ok();
    }

    // Push diagnostic flow: server fires a textDocument/publishDiagnostics
    // notification; LspClient records it; diagnostics_for returns it.
    // The URI we receive is decoded back to the same path string we'd send,
    // so query with the same form (no canonicalize() — that could cross a
    // symlink boundary like macOS's /tmp → /private/tmp).
    #[tokio::test]
    async fn push_diagnostic_lands_in_state() {
        let (client, _from, to_client) = pair().await;
        let path = tempfile_with("push-state", "");
        let uri = path_to_file_uri_string(&path);
        let d = diag(3, "unused variable");

        to_client
            .send(json!({
                "jsonrpc": "2.0",
                "method": "textDocument/publishDiagnostics",
                "params": {
                    "uri": uri,
                    "diagnostics": [d.clone()]
                }
            }))
            .unwrap();

        // Give the handler a moment to run.
        tokio::time::sleep(Duration::from_millis(50)).await;
        let got = client.diagnostics_for(&path);
        assert_eq!(got.len(), 1, "got: {got:?}");
        assert_eq!(got[0].message, "unused variable");
        std::fs::remove_file(&path).ok();
    }

    #[tokio::test]
    async fn diagnostics_for_unknown_file_returns_empty() {
        let (client, _from, _to) = pair().await;
        let got = client.diagnostics_for(Path::new("/tmp/never-touched.rs"));
        assert!(got.is_empty());
    }

    // Regression: two diagnostics with identical range/severity/message/source
    // but DIFFERENT codes must NOT dedupe. The agent needs to see both lints
    // when (say) two clippy rules fire on the same expression.
    #[tokio::test]
    async fn regression_dedupe_preserves_diagnostics_with_distinct_codes() {
        let (client, _from, _to) = pair().await;
        let path = PathBuf::from("/tmp/dedupe-codes-test.rs");

        let mut a = diag(5, "this is suspicious");
        a.code = Some(NumberOrString::String("clippy::needless_clone".to_string()));
        let mut b = diag(5, "this is suspicious");
        b.code = Some(NumberOrString::String(
            "clippy::redundant_clone".to_string(),
        ));

        {
            let mut state = client.inner.lock().unwrap();
            state.push.insert(path.clone(), vec![a.clone(), b.clone()]);
        }

        let merged = client.diagnostics_for(&path);
        assert_eq!(
            merged.len(),
            2,
            "different `code` must keep both: {merged:?}"
        );
    }

    // Regression: identical diagnostics from push + pull (or from two
    // overlapping push notifications) must dedupe so the UI doesn't show
    // every error twice.
    #[tokio::test]
    async fn regression_merged_diagnostics_dedupe_identical_entries() {
        let (client, _from, _to) = pair().await;
        let path = PathBuf::from("/tmp/dedupe-test.rs");
        let same = diag(1, "same error");
        let different = diag(2, "different error");

        {
            let mut state = client.inner.lock().unwrap();
            state
                .push
                .insert(path.clone(), vec![same.clone(), different.clone()]);
            state.pull.insert(path.clone(), vec![same.clone()]);
        }

        let merged = client.diagnostics_for(&path);
        assert_eq!(merged.len(), 2, "duplicates should collapse: {merged:?}");
        // Both unique items preserved.
        assert!(merged.iter().any(|d| d.message == "same error"));
        assert!(merged.iter().any(|d| d.message == "different error"));
    }

    #[tokio::test]
    async fn all_diagnostics_aggregates_every_tracked_file() {
        let (client, _from, _to) = pair().await;
        {
            let mut state = client.inner.lock().unwrap();
            state
                .push
                .insert(PathBuf::from("/tmp/a.rs"), vec![diag(1, "a-err")]);
            state
                .pull
                .insert(PathBuf::from("/tmp/b.rs"), vec![diag(2, "b-err")]);
        }
        let all = client.all_diagnostics();
        assert_eq!(all.len(), 2);
        assert!(all.contains_key(&PathBuf::from("/tmp/a.rs")));
        assert!(all.contains_key(&PathBuf::from("/tmp/b.rs")));
    }

    #[tokio::test]
    async fn all_diagnostics_prunes_empty_entries() {
        let (client, _from, _to) = pair().await;
        {
            let mut state = client.inner.lock().unwrap();
            state.push.insert(PathBuf::from("/tmp/empty.rs"), vec![]);
        }
        assert!(client.all_diagnostics().is_empty());
    }

    // Regression: wait_for_push must resolve when a fresh push arrives, NOT
    // on a push that landed before `after`. The "freshness" timestamp is the
    // entire point of the API — without it, the agent's wait_for_diagnostics
    // call after a write would resolve immediately on the previous turn's
    // stale diagnostics.
    #[tokio::test]
    async fn regression_wait_for_push_ignores_stale_arrivals() {
        let (client, _from, to_client) = pair().await;
        let path = PathBuf::from("/tmp/wait-stale.rs");
        let uri = path_to_file_uri_string(&path);

        // Land a push BEFORE we mark `after`.
        to_client
            .send(json!({
                "jsonrpc": "2.0",
                "method": "textDocument/publishDiagnostics",
                "params": { "uri": uri, "diagnostics": [diag(1, "stale")] }
            }))
            .unwrap();
        tokio::time::sleep(Duration::from_millis(30)).await;

        let after = Instant::now();
        // Should time out — no push arrives after `after`.
        let res = client
            .wait_for_push(&path, after, Duration::from_millis(80))
            .await;
        assert!(matches!(res, Err(LspError::DiagnosticsTimeout { .. })));
    }

    #[tokio::test]
    async fn wait_for_push_resolves_on_fresh_arrival() {
        let (client, _from, to_client) = pair().await;
        let path = PathBuf::from("/tmp/wait-fresh.rs");
        let uri = path_to_file_uri_string(&path);
        let after = Instant::now();

        // Schedule a push 50ms later.
        let to_client2 = to_client.clone();
        let uri2 = uri.clone();
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(50)).await;
            to_client2
                .send(json!({
                    "jsonrpc": "2.0",
                    "method": "textDocument/publishDiagnostics",
                    "params": { "uri": uri2, "diagnostics": [diag(2, "fresh")] }
                }))
                .unwrap();
        });

        let res = client
            .wait_for_push(&path, after, Duration::from_secs(1))
            .await;
        assert!(res.is_ok(), "expected push to satisfy wait: {res:?}");
        let got = client.diagnostics_for(&path);
        assert_eq!(got.len(), 1);
        assert_eq!(got[0].message, "fresh");
    }

    /// EXT-3 regression: two concurrent `notify_open` calls for the
    /// same path must serialize cleanly. Each rpc frame the client
    /// emits must carry consistent (version, text) — i.e. the text
    /// sent with version N reflects the file contents the call
    /// actually read.
    ///
    /// We can't reproduce the OS-level read race deterministically,
    /// but we can verify:
    ///   1. The two opens emit the expected (didOpen, didChange)
    ///      sequence with versions 0 and 1 — never two didOpens.
    ///   2. Each frame's text matches what was on disk at the time
    ///      the call was made (we mutate the file between calls).
    ///   3. No interleaving — the second call's didChange comes
    ///      strictly after the first call's didOpen returns.
    #[tokio::test]
    async fn ext3_concurrent_notify_open_serializes_per_path() {
        let (client, mut from_client, _to) = pair().await;
        let path = tempfile_with("ext3-race", "first contents\n");
        let path_for_b = path.clone();
        let client_for_b = client.clone();

        // A starts first and gets version 0. Immediately afterwards
        // we mutate the file and fire B which should get version 1
        // with the NEW contents. The per-path mutex guarantees B's
        // read happens after A's notify returns.
        let v0 = client.notify_open(&path).await.unwrap();
        assert_eq!(v0, 0);
        let open_frame = from_client.recv().await.unwrap();
        assert_eq!(open_frame["method"], "textDocument/didOpen");
        assert_eq!(open_frame["params"]["textDocument"]["version"], 0);
        assert_eq!(
            open_frame["params"]["textDocument"]["text"],
            "first contents\n"
        );

        // Mutate, then race two concurrent re-opens.
        std::fs::write(&path, "second contents\n").unwrap();

        let path_for_a = path.clone();
        let client_for_a = client.clone();
        let t_a = tokio::spawn(async move { client_for_a.notify_open(&path_for_a).await.unwrap() });
        let t_b = tokio::spawn(async move { client_for_b.notify_open(&path_for_b).await.unwrap() });

        let (va, vb) = tokio::join!(t_a, t_b);
        let va = va.unwrap();
        let vb = vb.unwrap();

        // The two returned versions must be 1 and 2 in some order
        // (never both 1 — that would mean A and B both bumped from 0
        // and one of the rpc frames carried stale content).
        let mut versions = [va, vb];
        versions.sort();
        assert_eq!(
            versions,
            [1, 2],
            "concurrent opens must produce distinct, sequential versions"
        );

        // Drain the two didChange frames. Both must carry the post-
        // mutation contents — neither can have shipped "first
        // contents\n" with a bumped version, which is exactly the
        // bug EXT-3 fixes.
        let f1 = from_client.recv().await.unwrap();
        let f2 = from_client.recv().await.unwrap();
        for f in [&f1, &f2] {
            assert_eq!(f["method"], "textDocument/didChange");
            assert_eq!(
                f["params"]["contentChanges"][0]["text"], "second contents\n",
                "frame must reflect on-disk contents at call time: {f}"
            );
        }
        std::fs::remove_file(&path).ok();
    }

    #[tokio::test]
    async fn wait_for_push_times_out_on_no_arrival() {
        let (client, _from, _to_client) = pair().await;
        let path = PathBuf::from("/tmp/wait-never.rs");
        let after = Instant::now();
        let res = client
            .wait_for_push(&path, after, Duration::from_millis(80))
            .await;
        assert!(matches!(res, Err(LspError::DiagnosticsTimeout { .. })));
    }
}