cargo-brief 0.10.0

Visibility-aware Rust API extractor — pseudo-Rust output for AI agent consumption
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
//! LSP daemon process: spawns rust-analyzer, accepts IPC clients, handles idle timeout.

use std::collections::HashSet;
use std::io::BufRead;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};

use anyhow::{Context, Result, bail};

use super::ipc;
use super::protocol::{DaemonRequest, DaemonResponse, RaStatus};
use super::query;
use super::transport::RaTransport;
use super::watcher::{self, DebounceBuffer};

/// Default idle timeout: 10 minutes.
const IDLE_TIMEOUT_SECS: u64 = 600;

/// Entry point for the re-exec'd daemon process. Parses args manually.
pub fn run_daemon_from_args() -> Result<()> {
    let args: Vec<String> = std::env::args().collect();

    let mut workspace_root = None;
    let mut daemon_dir = None;

    let mut i = 2; // skip binary name and "__lsp-daemon"
    while i < args.len() {
        match args[i].as_str() {
            "--workspace-root" | "--daemon-dir" => {
                let flag = &args[i];
                i += 1;
                let value = args
                    .get(i)
                    .with_context(|| format!("Missing value for {flag}"))?;
                match flag.as_str() {
                    "--workspace-root" => workspace_root = Some(PathBuf::from(value)),
                    "--daemon-dir" => daemon_dir = Some(PathBuf::from(value)),
                    _ => unreachable!(),
                }
            }
            other => bail!("Unknown daemon argument: {other}"),
        }
        i += 1;
    }

    let workspace_root = workspace_root.context("Missing --workspace-root")?;
    let daemon_dir = daemon_dir.context("Missing --daemon-dir")?;

    run_daemon(&workspace_root, &daemon_dir)
}

/// Discover the rust-analyzer binary path.
fn discover_ra_binary() -> Result<PathBuf> {
    // Try rustup first
    if let Ok(output) = Command::new("rustup")
        .args(["which", "rust-analyzer"])
        .output()
        && output.status.success()
    {
        let path = String::from_utf8_lossy(&output.stdout).trim().to_string();
        if !path.is_empty() {
            return Ok(PathBuf::from(path));
        }
    }

    // Fall back to platform-specific PATH lookup
    if let Some(path) = super::process::find_binary_on_path("rust-analyzer") {
        return Ok(path);
    }

    bail!(
        "rust-analyzer not found.\n\
         Install via: rustup component add rust-analyzer\n\
         Or ensure rust-analyzer is available on PATH."
    )
}

/// Send LSP `initialize` request and `initialized` notification.
fn send_initialize(transport: &mut RaTransport, workspace_root: &Path) -> Result<()> {
    let path_str = workspace_root.to_str().context("Non-UTF8 workspace root")?;
    // file:// URIs require three slashes for absolute paths: file:///path
    let root_uri = if path_str.starts_with('/') {
        format!("file://{path_str}")
    } else {
        format!("file:///{path_str}")
    };

    let params = serde_json::json!({
        "processId": std::process::id(),
        "rootUri": root_uri,
        "capabilities": {
            "window": { "workDoneProgress": true }
        },
        "initializationOptions": {}
    });

    let response = transport.send_request_and_wait("initialize", params)?;

    // Verify we got a result
    if response.get("result").is_none() {
        bail!("LSP initialize response missing 'result' field");
    }

    // Send initialized notification
    transport.send_notification("initialized", serde_json::json!({}))?;

    Ok(())
}

/// Handle a single request and produce a response.
fn handle_request(
    request: &DaemonRequest,
    ra_status: RaStatus,
    start_time: Instant,
    shutdown: &mut bool,
    transport: &mut RaTransport,
    workspace_root: &Path,
) -> DaemonResponse {
    match request {
        DaemonRequest::Stop => {
            *shutdown = true;
            DaemonResponse::Ok {
                message: "stopping".to_string(),
            }
        }
        DaemonRequest::Status => DaemonResponse::Status {
            pid: std::process::id(),
            ra_status,
            uptime_secs: start_time.elapsed().as_secs(),
        },
        DaemonRequest::References { symbol, quiet } => {
            match query::handle_references(transport, workspace_root, symbol, *quiet) {
                Ok(output) => DaemonResponse::QueryResult { output },
                Err(e) => DaemonResponse::Error {
                    message: format!("{e}"),
                },
            }
        }
        DaemonRequest::BlastRadius {
            symbol,
            depth,
            quiet,
        } => match query::handle_blast_radius(transport, workspace_root, symbol, *depth, *quiet) {
            Ok(output) => DaemonResponse::QueryResult { output },
            Err(e) => DaemonResponse::Error {
                message: format!("{e}"),
            },
        },
        DaemonRequest::CallHierarchy {
            symbol,
            outgoing,
            quiet,
        } => {
            match query::handle_call_hierarchy(transport, workspace_root, symbol, *outgoing, *quiet)
            {
                Ok(output) => DaemonResponse::QueryResult { output },
                Err(e) => DaemonResponse::Error {
                    message: format!("{e}"),
                },
            }
        }
        DaemonRequest::WaitForReady => DaemonResponse::Ok {
            message: "rust-analyzer ready".to_string(),
        },
    }
}

/// Shutdown rust-analyzer gracefully via LSP shutdown/exit.
/// Fire-and-forget: send shutdown request + exit notification without reading
/// responses. The reader thread has responses in its channel; we don't need to
/// process them during shutdown. The thread will exit when ra closes stdout
/// after receiving exit.
fn shutdown_ra(transport: &mut RaTransport) {
    let _ = transport.send_request("shutdown", serde_json::Value::Null);
    let _ = transport.send_notification("exit", serde_json::Value::Null);
}

/// Process a single ra notification for `$/progress` tracking.
/// Returns `Some(new_status)` when the indexing state changes.
fn process_ra_notification(
    msg: &serde_json::Value,
    active_progress: &mut HashSet<String>,
    had_progress: &mut bool,
) -> Option<RaStatus> {
    let method = msg.get("method")?.as_str()?;
    if method != "$/progress" {
        return None;
    }

    let params = msg.get("params")?;
    let token = params.get("token")?;
    let kind = params.get("value")?.get("kind")?.as_str()?;

    let token_key = token.to_string();

    match kind {
        "begin" => {
            active_progress.insert(token_key);
            *had_progress = true;
            Some(RaStatus::Indexing)
        }
        "end" => {
            active_progress.remove(&token_key);
            if *had_progress && active_progress.is_empty() {
                Some(RaStatus::Ready)
            } else {
                None
            }
        }
        _ => None, // "report" or unknown
    }
}

/// Uptime threshold (seconds) before assuming Ready if no `$/progress` seen.
const NO_PROGRESS_FALLBACK_SECS: u64 = 10;

/// Process one ra message: update progress state and reply to server requests.
fn handle_ra_message(
    msg: &serde_json::Value,
    transport: &mut RaTransport,
    ra_status: &mut RaStatus,
    active_progress: &mut HashSet<String>,
    had_progress: &mut bool,
) {
    if let Some(new_status) = process_ra_notification(msg, active_progress, had_progress) {
        if *ra_status != new_status {
            eprintln!("[lsp-daemon] ra status: {new_status}");
            *ra_status = new_status;
        }
    }
    // Reply to server-initiated requests (e.g. window/workDoneProgress/create)
    if msg.get("id").is_some() && msg.get("method").is_some() {
        if let Some(id) = msg.get("id").cloned() {
            let _ = transport.send_raw_response(id, serde_json::json!(null));
        }
    }
}

/// Check fallback: no progress tokens ever seen, uptime > threshold → assume Ready.
fn check_no_progress_fallback(ra_status: &mut RaStatus, had_progress: bool, start_time: Instant) {
    if !had_progress
        && *ra_status == RaStatus::Initializing
        && start_time.elapsed().as_secs() > NO_PROGRESS_FALLBACK_SECS
    {
        eprintln!("[lsp-daemon] ra status: ready (no progress reported, fallback)");
        *ra_status = RaStatus::Ready;
    }
}

/// Drain all available ra stdout messages. Updates ra_status and replies
/// to server-initiated requests. Returns true if any messages were read.
fn drain_ra_messages(
    transport: &mut RaTransport,
    ra_status: &mut RaStatus,
    active_progress: &mut HashSet<String>,
    had_progress: &mut bool,
    start_time: Instant,
) -> bool {
    let mut any_read = false;
    loop {
        match transport.try_read_message() {
            Ok(Some(msg)) => {
                any_read = true;
                handle_ra_message(&msg, transport, ra_status, active_progress, had_progress);
            }
            Ok(None) => break,
            Err(e) => {
                eprintln!("[lsp-daemon] ra stdout read error: {e}");
                break;
            }
        }
    }
    check_no_progress_fallback(ra_status, *had_progress, start_time);
    any_read
}

/// Default timeout for waiting for ra to finish indexing before a query.
const READY_TIMEOUT_SECS: u64 = 60;

/// After reaching Ready, keep draining for this long to catch re-indexing cycles.
/// ra goes through multiple begin/end phases (config load, workspace load,
/// proc-macro expansion, symbol indexing). Each time status returns to Ready,
/// the settle timer resets. Only when Ready persists for the full settle period
/// without any new progress begin do we consider indexing truly complete.
const SETTLE_MS: u64 = 5000;

/// Block until ra finishes indexing (status becomes Ready) or timeout.
/// After each Ready transition, continues draining for SETTLE_MS to catch
/// ra re-entering Indexing (common during multi-phase startup).
fn wait_for_ready(
    transport: &mut RaTransport,
    ra_status: &mut RaStatus,
    active_progress: &mut HashSet<String>,
    had_progress: &mut bool,
    start_time: Instant,
    timeout: Duration,
) -> Result<()> {
    if *ra_status == RaStatus::Ready {
        return Ok(());
    }

    let wait_start = Instant::now();
    // Tracks when we last entered Ready. None = not Ready yet.
    let mut ready_since: Option<Instant> = None;
    eprintln!("[lsp-daemon] waiting for rust-analyzer to finish indexing...");

    loop {
        if wait_start.elapsed() > timeout {
            bail!(
                "rust-analyzer is still indexing (waited {}s). Try again shortly.",
                timeout.as_secs()
            );
        }

        // Track Ready/Indexing transitions
        if *ra_status == RaStatus::Ready {
            if ready_since.is_none() {
                ready_since = Some(Instant::now());
            }
            // Settle period elapsed — indexing is truly done
            if ready_since.is_some_and(|t| t.elapsed().as_millis() >= SETTLE_MS as u128) {
                return Ok(());
            }
        } else if ready_since.is_some() {
            // Back to Indexing — reset settle timer
            ready_since = None;
        }

        // Poll ra stdout — shorter interval during settle for responsiveness
        let poll_timeout = if ready_since.is_some() { 100 } else { 500 };
        match transport.read_message_timeout(Duration::from_millis(poll_timeout))? {
            Some(msg) => {
                handle_ra_message(&msg, transport, ra_status, active_progress, had_progress);
            }
            None => {
                check_no_progress_fallback(ra_status, *had_progress, start_time);
            }
        }

        // Note: ra crash during indexing is primarily detected by the transport
        // returning Err (reader thread sees stdout close), which propagates via `?`.
        // The Stopped check below is a forward-looking guard in case a future
        // code path sets ra_status to Stopped within this function's scope.
        if *ra_status == RaStatus::Stopped {
            bail!("rust-analyzer stopped unexpectedly during indexing");
        }
    }
}

/// Main daemon loop.
pub fn run_daemon(workspace_root: &Path, daemon_dir: &Path) -> Result<()> {
    let start_time = Instant::now();
    let pid_path = daemon_dir.join("lsp.pid");

    // 1. Write PID file early to prevent double-spawn race
    std::fs::write(&pid_path, std::process::id().to_string())
        .context("Failed to write PID file")?;

    // 2. Discover ra binary
    let ra_bin = discover_ra_binary()?;
    eprintln!("[lsp-daemon] using rust-analyzer: {}", ra_bin.display());

    // 3. Spawn ra subprocess
    let mut ra_child = Command::new(&ra_bin)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .current_dir(workspace_root)
        .spawn()
        .with_context(|| format!("Failed to spawn rust-analyzer: {}", ra_bin.display()))?;

    let ra_stdin = ra_child.stdin.take().context("No stdin on ra process")?;
    let ra_stdout = ra_child.stdout.take().context("No stdout on ra process")?;

    // Drain ra stderr in a background thread to prevent pipe blocking
    if let Some(stderr) = ra_child.stderr.take() {
        std::thread::spawn(move || {
            let reader = std::io::BufReader::new(stderr);
            for line in reader.lines().map_while(Result::ok) {
                eprintln!("[ra-stderr] {line}");
            }
        });
    }

    let mut transport = RaTransport::new(ra_stdin, ra_stdout);

    // 4. LSP initialize
    let mut ra_status = RaStatus::Initializing;
    let mut active_progress: HashSet<String> = HashSet::new();
    let mut had_progress = false;

    eprintln!("[lsp-daemon] sending LSP initialize...");
    match send_initialize(&mut transport, workspace_root) {
        Ok(()) => {
            eprintln!("[lsp-daemon] rust-analyzer initialized, waiting for indexing...");
        }
        Err(e) => {
            eprintln!("[lsp-daemon] initialize failed: {e}");
            // Continue running — ra might still become ready
        }
    }

    // Spawn background reader thread for ra stdout (replaces libc::poll)
    transport.spawn_reader_thread();

    // 5. Start file watcher
    let (fs_rx, _watcher) = match watcher::start_watcher(workspace_root) {
        Ok((watcher, rx)) => {
            eprintln!("[lsp-daemon] file watcher started");
            (Some(rx), Some(watcher))
        }
        Err(e) => {
            eprintln!("[lsp-daemon] file watcher failed: {e}, continuing without");
            (None, None)
        }
    };
    let mut debounce_buf = DebounceBuffer::new();

    // 6. Create IPC endpoints (AFTER ra init — preserves readiness invariant)
    let mut ipc_handle = ipc::DaemonIpc::setup(daemon_dir)?;

    eprintln!("[lsp-daemon] listening on IPC in {}", daemon_dir.display());

    // 7. Main loop
    let idle_timeout = Duration::from_secs(
        std::env::var("CARGO_BRIEF_LSP_TIMEOUT")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(IDLE_TIMEOUT_SECS),
    );
    let ready_timeout = Duration::from_secs(
        std::env::var("CARGO_BRIEF_LSP_READY_TIMEOUT")
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(READY_TIMEOUT_SECS),
    );
    let mut last_activity = Instant::now();
    let mut shutdown = false;

    loop {
        // Poll for incoming client request
        let poll_result = match ipc_handle.poll_request(100) {
            Ok(r) => r,
            Err(e) => {
                eprintln!("[lsp-daemon] failed to read request: {e}");
                continue;
            }
        };
        match poll_result {
            Some(request) => {
                // Wait for ra to finish indexing before dispatch
                let wait_timeout = match &request {
                    DaemonRequest::WaitForReady => Some(Duration::MAX),
                    DaemonRequest::References { .. }
                    | DaemonRequest::BlastRadius { .. }
                    | DaemonRequest::CallHierarchy { .. } => Some(ready_timeout),
                    _ => None,
                };
                if let Some(timeout) = wait_timeout
                    && ra_status != RaStatus::Ready
                    && let Err(e) = wait_for_ready(
                        &mut transport,
                        &mut ra_status,
                        &mut active_progress,
                        &mut had_progress,
                        start_time,
                        timeout,
                    )
                {
                    let response = DaemonResponse::Error {
                        message: format!("{e}"),
                    };
                    if let Err(we) = ipc_handle.send_response(&response) {
                        eprintln!("[lsp-daemon] failed to write error response: {we}");
                    }
                    continue;
                }

                // Process request
                let response = handle_request(
                    &request,
                    ra_status,
                    start_time,
                    &mut shutdown,
                    &mut transport,
                    workspace_root,
                );

                // Write response
                if let Err(e) = ipc_handle.send_response(&response) {
                    eprintln!("[lsp-daemon] failed to write response: {e}");
                }

                last_activity = Instant::now();
                if shutdown {
                    break;
                }
            }
            None => {
                // Timeout — check idle
                if last_activity.elapsed() > idle_timeout {
                    eprintln!("[lsp-daemon] idle timeout, shutting down");
                    break;
                }
            }
        }

        // Drain ra stdout (progress notifications, server requests)
        if ra_status != RaStatus::Stopped {
            drain_ra_messages(
                &mut transport,
                &mut ra_status,
                &mut active_progress,
                &mut had_progress,
                start_time,
            );
        }

        // Drain FS events
        if let Some(rx) = &fs_rx {
            while let Ok(event) = rx.try_recv() {
                debounce_buf.push(event);
            }
            if debounce_buf.should_flush() {
                let events = debounce_buf.drain();
                let params = watcher::build_did_change_notification(&events);
                if let Err(e) =
                    transport.send_notification("workspace/didChangeWatchedFiles", params)
                {
                    eprintln!("[lsp-daemon] failed to notify ra of file changes: {e}");
                }
            }
        }

        // Check if ra is still alive
        match ra_child.try_wait() {
            Ok(Some(status)) => {
                eprintln!("[lsp-daemon] rust-analyzer exited: {status}");
                ra_status = RaStatus::Stopped;
                break;
            }
            Ok(None) => {} // still running
            Err(e) => {
                eprintln!("[lsp-daemon] failed to check ra status: {e}");
            }
        }
    }

    // 8. Cleanup
    if ra_status != RaStatus::Stopped {
        shutdown_ra(&mut transport);
    }
    // Wait for ra to exit
    let _ = ra_child.wait();

    // Remove IPC files + non-IPC files
    ipc::cleanup_ipc_files(daemon_dir);
    std::fs::remove_file(daemon_dir.join("lsp.pid")).ok();
    std::fs::remove_file(daemon_dir.join("lsp.log")).ok();
    // Try to remove the parent directory (only succeeds if empty)
    std::fs::remove_dir(daemon_dir).ok();

    eprintln!("[lsp-daemon] shut down cleanly");
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    fn progress_begin(token: serde_json::Value) -> serde_json::Value {
        json!({
            "jsonrpc": "2.0",
            "method": "$/progress",
            "params": { "token": token, "value": { "kind": "begin", "title": "Indexing" } }
        })
    }

    fn progress_end(token: serde_json::Value) -> serde_json::Value {
        json!({
            "jsonrpc": "2.0",
            "method": "$/progress",
            "params": { "token": token, "value": { "kind": "end" } }
        })
    }

    fn progress_report(token: serde_json::Value) -> serde_json::Value {
        json!({
            "jsonrpc": "2.0",
            "method": "$/progress",
            "params": { "token": token, "value": { "kind": "report", "percentage": 50 } }
        })
    }

    #[test]
    fn progress_begin_returns_indexing() {
        let mut set = HashSet::new();
        let mut had = false;
        let result = process_ra_notification(&progress_begin(json!("tok1")), &mut set, &mut had);
        assert_eq!(result, Some(RaStatus::Indexing));
        assert!(had);
        assert!(set.contains(&json!("tok1").to_string()));
    }

    #[test]
    fn progress_end_last_token_returns_ready() {
        let mut set = HashSet::new();
        let mut had = false;
        process_ra_notification(&progress_begin(json!("tok1")), &mut set, &mut had);
        let result = process_ra_notification(&progress_end(json!("tok1")), &mut set, &mut had);
        assert_eq!(result, Some(RaStatus::Ready));
        assert!(set.is_empty());
    }

    #[test]
    fn progress_end_not_last_returns_none() {
        let mut set = HashSet::new();
        let mut had = false;
        process_ra_notification(&progress_begin(json!("a")), &mut set, &mut had);
        process_ra_notification(&progress_begin(json!("b")), &mut set, &mut had);
        let result = process_ra_notification(&progress_end(json!("a")), &mut set, &mut had);
        assert_eq!(result, None); // "b" still active
        assert_eq!(set.len(), 1);
    }

    #[test]
    fn progress_report_returns_none() {
        let mut set = HashSet::new();
        let mut had = false;
        process_ra_notification(&progress_begin(json!("tok1")), &mut set, &mut had);
        let result = process_ra_notification(&progress_report(json!("tok1")), &mut set, &mut had);
        assert_eq!(result, None);
    }

    #[test]
    fn non_progress_notification_returns_none() {
        let mut set = HashSet::new();
        let mut had = false;
        let msg = json!({
            "jsonrpc": "2.0",
            "method": "textDocument/publishDiagnostics",
            "params": {}
        });
        assert_eq!(process_ra_notification(&msg, &mut set, &mut had), None);
    }

    #[test]
    fn progress_begin_when_had_progress_already_true() {
        let mut set = HashSet::new();
        let mut had = true; // already true from prior cycle
        let result = process_ra_notification(&progress_begin(json!("tok2")), &mut set, &mut had);
        assert_eq!(result, Some(RaStatus::Indexing));
    }

    #[test]
    fn two_full_begin_end_cycles() {
        let mut set = HashSet::new();
        let mut had = false;
        // Cycle 1
        process_ra_notification(&progress_begin(json!("a")), &mut set, &mut had);
        let r = process_ra_notification(&progress_end(json!("a")), &mut set, &mut had);
        assert_eq!(r, Some(RaStatus::Ready));
        // Cycle 2
        let r = process_ra_notification(&progress_begin(json!("b")), &mut set, &mut had);
        assert_eq!(r, Some(RaStatus::Indexing));
        let r = process_ra_notification(&progress_end(json!("b")), &mut set, &mut had);
        assert_eq!(r, Some(RaStatus::Ready));
    }

    #[test]
    fn unknown_token_in_end_is_noop() {
        let mut set = HashSet::new();
        let mut had = false;
        // End without prior begin — had_progress is false, so no Ready
        let r = process_ra_notification(&progress_end(json!("unknown")), &mut set, &mut had);
        assert_eq!(r, None);
    }

    #[test]
    fn integer_token_normalized() {
        let mut set = HashSet::new();
        let mut had = false;
        process_ra_notification(&progress_begin(json!(1)), &mut set, &mut had);
        assert!(set.contains("1"));
        let r = process_ra_notification(&progress_end(json!(1)), &mut set, &mut had);
        assert_eq!(r, Some(RaStatus::Ready));
    }

    #[test]
    fn string_token_normalized() {
        let mut set = HashSet::new();
        let mut had = false;
        process_ra_notification(&progress_begin(json!("hello")), &mut set, &mut had);
        // String tokens include quotes in to_string()
        assert!(set.contains(&json!("hello").to_string()));
        let r = process_ra_notification(&progress_end(json!("hello")), &mut set, &mut had);
        assert_eq!(r, Some(RaStatus::Ready));
    }

    #[test]
    fn mixed_int_string_tokens() {
        let mut set = HashSet::new();
        let mut had = false;
        process_ra_notification(&progress_begin(json!(42)), &mut set, &mut had);
        process_ra_notification(&progress_begin(json!("foo")), &mut set, &mut had);
        assert_eq!(set.len(), 2);
        process_ra_notification(&progress_end(json!(42)), &mut set, &mut had);
        assert_eq!(set.len(), 1);
        let r = process_ra_notification(&progress_end(json!("foo")), &mut set, &mut had);
        assert_eq!(r, Some(RaStatus::Ready));
    }
}