a3s-code-core 3.3.0

A3S Code Core - Embeddable AI agent library with tool execution
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
//! Cross-module integration tests for the session/agent close lifecycle.
//!
//! Unit tests in `core/src/agent_api/tests.rs` cover the isolated APIs.
//! This file exercises the *interaction* between session close, the
//! subagent task tracker, and the parent agent's session registry —
//! crossings that single-module unit tests cannot reach.
//!
//! Run with:
//!   cargo test --test test_session_close_lifecycle -- --nocapture

use a3s_code_core::config::{CodeConfig, ModelConfig, ModelModalities, ProviderConfig};
use a3s_code_core::llm::Message;
use a3s_code_core::mcp::{McpServerConfig, McpTransportConfig};
use a3s_code_core::subagent_task_tracker::SubagentStatus;
use a3s_code_core::{Agent, AgentEvent, SessionOptions};
use tokio_util::sync::CancellationToken;

/// Minimal offline config — no real provider is contacted because every
/// test below avoids `send`/`stream`.
fn offline_test_config() -> CodeConfig {
    CodeConfig {
        default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
        providers: vec![ProviderConfig {
            name: "anthropic".to_string(),
            api_key: Some("offline-key".to_string()),
            base_url: None,
            headers: std::collections::HashMap::new(),
            session_id_header: None,
            models: vec![ModelConfig {
                id: "claude-sonnet-4-20250514".to_string(),
                name: "Claude Sonnet 4".to_string(),
                family: "claude-sonnet".to_string(),
                api_key: None,
                base_url: None,
                headers: std::collections::HashMap::new(),
                session_id_header: None,
                attachment: false,
                reasoning: false,
                tool_call: true,
                temperature: true,
                release_date: None,
                modalities: ModelModalities::default(),
                cost: Default::default(),
                limit: Default::default(),
            }],
        }],
        ..Default::default()
    }
}

/// IT-1: closing a session with a delegated subagent task in flight must
/// transition that task to Cancelled, fire its registered cancel token,
/// and — critically — a late `SubagentEnd` event from the cancelled child
/// loop must not regress the terminal status back to Completed.
///
/// This crosses the `session_close` → `subagent_task_tracker` →
/// `record_event` boundary that single-module unit tests cannot exercise.
#[tokio::test]
async fn close_with_subagent_in_flight_marks_task_cancelled_and_resists_regression() {
    let agent = Agent::from_config(offline_test_config()).await.unwrap();
    let opts = SessionOptions::new().with_session_id("it1-close-subagent");
    let session = agent
        .session("/tmp/it1-close-subagent-workspace", Some(opts))
        .expect("session");

    // Simulate the in-flight state that the built-in `task` tool produces:
    // a SubagentStart event, plus a registered cancellation token.
    let tracker = session.subagent_tracker();
    let task_id = "task-abc";
    let child_session_id = "child-xyz";
    let canceller = CancellationToken::new();

    tracker
        .record_event(&AgentEvent::SubagentStart {
            task_id: task_id.to_string(),
            session_id: child_session_id.to_string(),
            parent_session_id: session.id().to_string(),
            agent: "general".to_string(),
            description: "long-running synthetic task".to_string(),
        })
        .await;
    tracker.register_canceller(task_id, canceller.clone()).await;

    // Sanity: the task is visible as Running before close.
    let pending = session.pending_subagent_tasks().await;
    assert_eq!(pending.len(), 1, "pre-close pending list");
    assert_eq!(pending[0].task_id, task_id);
    assert_eq!(pending[0].status, SubagentStatus::Running);
    assert!(
        !canceller.is_cancelled(),
        "canceller must not be fired before close"
    );

    // Close the session — this is the cross-module action under test.
    session.close().await;
    assert!(session.is_closed(), "session must report closed");
    assert!(
        canceller.is_cancelled(),
        "subagent canceller must be fired by close()"
    );

    // The tracker view must show the task as Cancelled, and
    // pending_subagent_tasks() must drop it.
    let snapshot = session
        .subagent_task(task_id)
        .await
        .expect("snapshot still queryable after close");
    assert_eq!(snapshot.status, SubagentStatus::Cancelled);
    assert!(session.pending_subagent_tasks().await.is_empty());

    // Critical contract: a *late* SubagentEnd from the cancelled child loop
    // (success=true would be the worst case for status regression) must
    // NOT downgrade the terminal status back to Completed.
    tracker
        .record_event(&AgentEvent::SubagentEnd {
            task_id: task_id.to_string(),
            session_id: child_session_id.to_string(),
            agent: "general".to_string(),
            output: "would-have-succeeded".to_string(),
            success: true,
        })
        .await;
    let after_end = session
        .subagent_task(task_id)
        .await
        .expect("snapshot remains queryable");
    assert_eq!(
        after_end.status,
        SubagentStatus::Cancelled,
        "late SubagentEnd(success=true) must not regress Cancelled status"
    );
}

/// Minimal MCP server config — `enabled = false` so `connect_global_mcp`
/// does not actually spawn a subprocess. The presence of the entry still
/// causes `agent_bootstrap::connect_global_mcp` to construct a
/// `Some(McpManager)` (it only returns `None` when `mcp_servers` is
/// empty), which is what we need to exercise the MCP branch of
/// `Agent::close()`.
fn disabled_mcp_server(name: &str) -> McpServerConfig {
    McpServerConfig {
        name: name.to_string(),
        transport: McpTransportConfig::Stdio {
            command: "/bin/true".to_string(),
            args: vec![],
        },
        enabled: false,
        env: std::collections::HashMap::new(),
        oauth: None,
        tool_timeout_secs: 60,
    }
}

/// IT-2: `Agent::close()` is idempotent and cleanly walks the
/// `global_mcp.list_connected()` branch even when there are no live
/// MCP connections — and is also safe when `global_mcp` is `None`.
///
/// We exercise both flavors (with and without `global_mcp`) so the
/// "if let Some(mcp)" arm in `agent_sessions::close_agent` is hit and
/// the no-`global_mcp` short-circuit is also covered.
#[tokio::test]
async fn agent_close_handles_global_mcp_branch_and_is_idempotent() {
    // Flavor A: no MCP at all — Agent::close() must short-circuit the
    // global_mcp branch.
    {
        let agent = Agent::from_config(offline_test_config()).await.unwrap();
        assert!(!agent.is_closed());
        agent.close().await;
        assert!(agent.is_closed());
        // Idempotent: second close is a no-op (no panic).
        agent.close().await;
        assert!(agent.is_closed());
    }

    // Flavor B: config carries a disabled MCP server entry. This makes
    // `agent_bootstrap::connect_global_mcp` return `Some(manager)` (the
    // manager is constructed because mcp_servers is non-empty) while
    // never opening a real connection. `list_connected()` is therefore
    // empty, and `Agent::close()` must traverse the branch cleanly.
    {
        let mut cfg = offline_test_config();
        cfg.mcp_servers = vec![disabled_mcp_server("offline-server")];
        let agent = Agent::from_config(cfg).await.unwrap();

        agent.close().await;
        assert!(agent.is_closed());

        // After close, the agent must reject new session creation —
        // proving close() ran the full close_agent path (not just the
        // MCP branch).
        let err = agent
            .session("/tmp/it2-post-close", None)
            .err()
            .expect("session() after close must error");
        let msg = err.to_string();
        assert!(
            msg.contains("closed") || msg.contains("Closed"),
            "post-close session() error must mention 'closed', got: {msg}"
        );
    }
}

/// IT-3: under concurrent creation + drop traffic, the agent session
/// registry must converge to *exactly* the IDs of sessions still held
/// by the caller. Single-threaded unit tests can't observe the
/// `std::sync::Mutex<HashMap<...>>` insert / drop / lazy-prune dance
/// under real parallelism.
///
/// Strategy:
/// 1. From N concurrent tasks on a multi-thread runtime, create one
///    session each.
/// 2. Drop half the sessions immediately; hold the other half.
/// 3. Wait for all tasks to settle.
/// 4. Assert `agent.list_sessions()` returns exactly the held IDs
///    (sorted, deduped).
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn session_drop_prunes_registry_under_concurrency() {
    let agent = std::sync::Arc::new(Agent::from_config(offline_test_config()).await.unwrap());

    const N: usize = 32;

    let mut handles = Vec::with_capacity(N);
    for i in 0..N {
        let agent = std::sync::Arc::clone(&agent);
        handles.push(tokio::spawn(async move {
            let id = format!("it3-session-{i:02}");
            let opts = SessionOptions::new().with_session_id(&id);
            let session = agent
                .session(format!("/tmp/it3-ws-{i:02}"), Some(opts))
                .expect("session");

            // Drop the even-indexed sessions immediately so the registry
            // has to prune their Weak entries; hold the odd ones.
            if i % 2 == 0 {
                drop(session);
                None
            } else {
                Some((id, session))
            }
        }));
    }

    // Collect every held session so they outlive the assertion below.
    let mut held = Vec::new();
    for h in handles {
        if let Some(kept) = h.await.expect("task should not panic") {
            held.push(kept);
        }
    }

    let mut expected: Vec<String> = held.iter().map(|(id, _)| id.clone()).collect();
    expected.sort();

    let observed = agent.list_sessions().await;
    assert_eq!(
        observed, expected,
        "registry must contain exactly the IDs of still-held sessions"
    );

    // Now drop the held set and verify the registry collapses to empty
    // on the next access (lazy prune).
    drop(held);
    let after_drop = agent.list_sessions().await;
    assert!(
        after_drop.is_empty(),
        "after dropping all sessions the registry must prune to empty, got: {after_drop:?}"
    );
}

/// IT-4 (Pillar 1): subagent task tracker contents survive a session
/// save/resume cycle. Before this, `session.save()` persisted history /
/// runs / traces / verification but the materialized subagent task view
/// was lost, breaking cluster-scale session migration.
///
/// Requires multi_thread runtime because `restore_persisted_session_state`
/// uses `block_in_place` to bridge the sync `resume_session` API with
/// the async `SessionStore` calls.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subagent_tasks_persist_across_save_and_resume() {
    use a3s_code_core::store::MemorySessionStore;

    let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
        std::sync::Arc::new(MemorySessionStore::new());

    // ----- Phase A: write -----
    let agent_a = Agent::from_config(offline_test_config()).await.unwrap();
    let opts_a = SessionOptions::new()
        .with_session_id("pillar1-subagent-persist")
        .with_session_store(std::sync::Arc::clone(&store))
        .with_auto_save(true);
    let session_a = agent_a
        .session("/tmp/pillar1-subagent-persist", Some(opts_a))
        .expect("phase A session");

    let tracker_a = session_a.subagent_tracker();

    // Three tasks: one completed, one failed, one cancelled — the full
    // matrix of terminal states the migration target needs to observe.
    let parent_id = session_a.id().to_string();
    let inject = |task_id: &str, child_id: &str| AgentEvent::SubagentStart {
        task_id: task_id.to_string(),
        session_id: child_id.to_string(),
        parent_session_id: parent_id.clone(),
        agent: "general".to_string(),
        description: format!("seed {task_id}"),
    };
    tracker_a.record_event(&inject("p1-done", "child-1")).await;
    tracker_a
        .record_event(&AgentEvent::SubagentEnd {
            task_id: "p1-done".to_string(),
            session_id: "child-1".to_string(),
            agent: "general".to_string(),
            output: "ok".to_string(),
            success: true,
        })
        .await;
    tracker_a.record_event(&inject("p1-fail", "child-2")).await;
    tracker_a
        .record_event(&AgentEvent::SubagentEnd {
            task_id: "p1-fail".to_string(),
            session_id: "child-2".to_string(),
            agent: "general".to_string(),
            output: "boom".to_string(),
            success: false,
        })
        .await;
    tracker_a
        .record_event(&inject("p1-cancel", "child-3"))
        .await;
    tracker_a
        .register_canceller("p1-cancel", CancellationToken::new())
        .await;
    let _ = session_a.cancel_subagent_task("p1-cancel").await;

    session_a.save().await.expect("phase A save");

    let pre_save: Vec<(String, SubagentStatus)> = session_a
        .subagent_tasks()
        .await
        .into_iter()
        .map(|t| (t.task_id, t.status))
        .collect();
    assert_eq!(pre_save.len(), 3);

    // Drop everything from phase A.
    drop(session_a);
    drop(agent_a);

    // ----- Phase B: read -----
    let agent_b = Agent::from_config(offline_test_config()).await.unwrap();
    let resume_opts = SessionOptions::new().with_session_store(std::sync::Arc::clone(&store));
    let session_b = agent_b
        .resume_session("pillar1-subagent-persist", resume_opts)
        .expect("phase B resume");

    let mut post_resume: Vec<(String, SubagentStatus)> = session_b
        .subagent_tasks()
        .await
        .into_iter()
        .map(|t| (t.task_id, t.status))
        .collect();
    post_resume.sort_by(|a, b| a.0.cmp(&b.0));
    let mut expected = pre_save.clone();
    expected.sort_by(|a, b| a.0.cmp(&b.0));
    assert_eq!(
        post_resume, expected,
        "resumed session must observe the same subagent task set & statuses"
    );

    // Cancellers are intentionally NOT restored. Cancelling an already-
    // terminal task returns false (no live canceller), but must not panic
    // and must keep the status stable.
    let cancel_attempt = session_b.cancel_subagent_task("p1-done").await;
    assert!(
        !cancel_attempt,
        "cancel on a restored terminal task must return false (no live canceller)"
    );
    let still_done = session_b
        .subagent_task("p1-done")
        .await
        .expect("snapshot still present");
    assert_eq!(still_done.status, SubagentStatus::Completed);
}

/// IT-5 (Pillar 5): identity labels (tenant / principal / agent template /
/// correlation id) survive a session save/resume round trip and are
/// restored verbatim. These are framework-opaque strings that the host
/// (书安OS) uses for multi-tenancy / accounting / tracing — losing
/// them on migration breaks audit trails.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn identity_labels_persist_across_save_and_resume() {
    use a3s_code_core::store::MemorySessionStore;

    let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
        std::sync::Arc::new(MemorySessionStore::new());

    // Phase A: write
    let agent_a = Agent::from_config(offline_test_config()).await.unwrap();
    let opts_a = SessionOptions::new()
        .with_session_id("pillar5-labels")
        .with_session_store(std::sync::Arc::clone(&store))
        .with_auto_save(true)
        .with_tenant_id("acme-prod")
        .with_principal("svc-deploy-bot")
        .with_agent_template_id("ci-runner-v7")
        .with_correlation_id("trace-1234abcd");
    let session_a = agent_a
        .session("/tmp/pillar5-labels", Some(opts_a))
        .expect("phase A session");

    session_a.save().await.expect("phase A save");

    assert_eq!(session_a.tenant_id(), Some("acme-prod"));
    assert_eq!(session_a.correlation_id(), Some("trace-1234abcd"));

    drop(session_a);
    drop(agent_a);

    // Phase B: resume on a fresh agent; supply only the store, no labels.
    // Labels must be restored verbatim from the saved snapshot.
    let agent_b = Agent::from_config(offline_test_config()).await.unwrap();
    let resume_opts = SessionOptions::new().with_session_store(std::sync::Arc::clone(&store));
    let session_b = agent_b
        .resume_session("pillar5-labels", resume_opts)
        .expect("phase B resume");

    assert_eq!(session_b.tenant_id(), Some("acme-prod"));
    assert_eq!(session_b.principal(), Some("svc-deploy-bot"));
    assert_eq!(session_b.agent_template_id(), Some("ci-runner-v7"));
    assert_eq!(session_b.correlation_id(), Some("trace-1234abcd"));

    // Caller-supplied labels on resume override the persisted ones —
    // e.g. relabeling under a new correlation id for a follow-up trace.
    drop(session_b);
    let resume_relabel = SessionOptions::new()
        .with_session_store(std::sync::Arc::clone(&store))
        .with_correlation_id("trace-followup");
    let session_c = agent_b
        .resume_session("pillar5-labels", resume_relabel)
        .expect("phase C resume");
    assert_eq!(
        session_c.correlation_id(),
        Some("trace-followup"),
        "caller-supplied correlation_id must override persisted one"
    );
    // Other labels still restored from snapshot.
    assert_eq!(session_c.tenant_id(), Some("acme-prod"));
}

/// IT-CONSOLIDATED (cluster ops): exercise the full cluster-grade
/// API surface in one realistic two-node lifecycle. This is the
/// reference flow 书安OS-side scheduling code targets.
///
/// Two **separate** Agents share one MemorySessionStore (simulating
/// two cluster nodes mounting the same persistent store):
///   Node A: builds a session with identity labels + retention caps,
///           seeds a loop checkpoint, then drops everything.
///   Node B: loads the session by id, rehydrates labels + subagent
///           tracker, picks up the checkpointed run via resume_run.
///
/// The host-supplied identity labels, retention caps, and persisted
/// subagent task snapshots must all survive the cross-node hop —
/// these are exactly the invariants 书安OS relies on for billing,
/// audit, and memory safety in a long-lived fleet.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cluster_ops_consolidated_session_lifecycle() {
    use a3s_code_core::loop_checkpoint::{LoopCheckpoint, LOOP_CHECKPOINT_SCHEMA_VERSION};
    use a3s_code_core::retention::SessionRetentionLimits;
    use a3s_code_core::store::MemorySessionStore;

    let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
        std::sync::Arc::new(MemorySessionStore::new());

    // -------------------------------------------------------------------
    // Node A: create session, seed in-flight state, persist, then drop.
    // -------------------------------------------------------------------
    let agent_a = Agent::from_config(offline_test_config()).await.unwrap();
    let limits_a = SessionRetentionLimits::new()
        .with_max_runs(50)
        .with_max_terminal_subagent_tasks(20);
    let opts_a = SessionOptions::new()
        .with_session_id("cluster-ops-target")
        .with_session_store(std::sync::Arc::clone(&store))
        .with_auto_save(true)
        .with_tenant_id("acme-prod")
        .with_principal("svc-deploy-bot")
        .with_agent_template_id("planner-v3")
        .with_correlation_id("trace-cluster-ops")
        .with_retention_limits(limits_a);
    let session_a = agent_a
        .session("/tmp/cluster-ops-node-a", Some(opts_a))
        .expect("node A session");

    // Inject a completed subagent task — represents work that
    // happened on node A and should survive migration.
    let tracker_a = session_a.subagent_tracker();
    tracker_a
        .record_event(&AgentEvent::SubagentStart {
            task_id: "explore-1".to_string(),
            session_id: "child-1".to_string(),
            parent_session_id: session_a.id().to_string(),
            agent: "explore".to_string(),
            description: "find auth callsites".to_string(),
        })
        .await;
    tracker_a
        .record_event(&AgentEvent::SubagentEnd {
            task_id: "explore-1".to_string(),
            session_id: "child-1".to_string(),
            agent: "explore".to_string(),
            output: "found 3 callsites".to_string(),
            success: true,
        })
        .await;

    session_a.save().await.expect("node A save");

    // Seed a checkpoint as if a run was mid-tool-round when node A died.
    let seeded_run_id = "in-flight-run-x";
    let cp = LoopCheckpoint {
        schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
        run_id: seeded_run_id.to_string(),
        session_id: session_a.id().to_string(),
        turn: 2,
        messages: vec![
            Message::user("refactor the auth module"),
            Message {
                role: "assistant".to_string(),
                content: vec![a3s_code_core::llm::ContentBlock::Text {
                    text: "scanned callsites, planning edits".to_string(),
                }],
                reasoning_content: None,
            },
        ],
        total_usage: a3s_code_core::llm::TokenUsage {
            prompt_tokens: 800,
            completion_tokens: 200,
            total_tokens: 1000,
            cache_read_tokens: None,
            cache_write_tokens: None,
        },
        tool_calls_count: 1,
        verification_reports: Vec::new(),
        checkpoint_ms: 1_700_000_000_000,
    };
    store
        .save_loop_checkpoint(seeded_run_id, &cp)
        .await
        .expect("seed checkpoint");

    // Node A goes down.
    drop(session_a);
    drop(agent_a);

    // -------------------------------------------------------------------
    // Node B: a different Agent picks up the session from the store.
    // -------------------------------------------------------------------
    let agent_b = Agent::from_config(offline_test_config()).await.unwrap();
    let resume_opts = SessionOptions::new().with_session_store(std::sync::Arc::clone(&store));
    let session_b = agent_b
        .resume_session("cluster-ops-target", resume_opts)
        .expect("node B resume");

    // Identity labels survive.
    assert_eq!(session_b.tenant_id(), Some("acme-prod"));
    assert_eq!(session_b.principal(), Some("svc-deploy-bot"));
    assert_eq!(session_b.agent_template_id(), Some("planner-v3"));
    assert_eq!(session_b.correlation_id(), Some("trace-cluster-ops"));

    // Subagent task history survives.
    let restored_tasks = session_b.subagent_tasks().await;
    assert_eq!(restored_tasks.len(), 1);
    assert_eq!(restored_tasks[0].task_id, "explore-1");
    assert_eq!(
        restored_tasks[0].status,
        a3s_code_core::subagent_task_tracker::SubagentStatus::Completed
    );

    // Crashed run can be resumed from the persisted checkpoint via the
    // session API. (Note: we don't actually call resume_run here
    // because the test config has no real LLM credentials — that's
    // covered by test_resume_run_picks_up_from_persisted_checkpoint
    // which uses build_session with a mock client. We assert the
    // *checkpoint contract* — what the run-resumption code reads —
    // is intact across the migration.)
    let cp_after = {
        let s: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
            std::sync::Arc::clone(&store);
        s.load_loop_checkpoint(seeded_run_id)
            .await
            .expect("load checkpoint after migration")
            .expect("checkpoint preserved")
    };
    assert_eq!(cp_after.run_id, seeded_run_id);
    assert_eq!(cp_after.turn, 2);
    assert_eq!(cp_after.messages.len(), 2);
    assert_eq!(cp_after.total_usage.total_tokens, 1000);

    // Node B can decide to clean up the old run id once it's done with
    // resumption — the host (书安OS) tracks the old→new run mapping.
    // The framework does not auto-delete checkpoints; that's the
    // host's call.
}

/// IT-9 (Retention): SessionOptions::with_retention_limits flows
/// through to the session's in-memory subagent task tracker so a
/// long-running session's terminal entries don't accumulate
/// unboundedly. Verified via the public tracker accessor — same
/// surface 书安OS would inspect / drive externally.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retention_limits_are_plumbed_into_subagent_tracker() {
    use a3s_code_core::retention::SessionRetentionLimits;

    let agent = Agent::from_config(offline_test_config()).await.unwrap();
    let limits = SessionRetentionLimits::new().with_max_terminal_subagent_tasks(2);
    let opts = SessionOptions::new()
        .with_session_id("it9-retention")
        .with_retention_limits(limits);
    let session = agent
        .session("/tmp/it9-retention-ws", Some(opts))
        .expect("session");
    let tracker = session.subagent_tracker();

    let parent = session.id().to_string();
    let start = |task_id: &str| AgentEvent::SubagentStart {
        task_id: task_id.to_string(),
        session_id: format!("{task_id}-child"),
        parent_session_id: parent.clone(),
        agent: "general".to_string(),
        description: "seed".to_string(),
    };
    let end = |task_id: &str| AgentEvent::SubagentEnd {
        task_id: task_id.to_string(),
        session_id: format!("{task_id}-child"),
        agent: "general".to_string(),
        output: "ok".to_string(),
        success: true,
    };

    // Inject three completed tasks; the cap is 2 so the oldest must
    // be evicted via the framework's FIFO terminal-cap policy.
    for id in ["t-a", "t-b", "t-c"] {
        tracker.record_event(&start(id)).await;
        tracker.record_event(&end(id)).await;
    }

    let surviving: Vec<String> = session
        .subagent_tasks()
        .await
        .into_iter()
        .map(|t| t.task_id)
        .collect();
    assert_eq!(surviving.len(), 2, "cap must be enforced");
    assert!(surviving.contains(&"t-b".to_string()));
    assert!(surviving.contains(&"t-c".to_string()));
    assert!(
        !surviving.contains(&"t-a".to_string()),
        "oldest terminal entry must be evicted by SessionRetentionLimits"
    );
}

/// IT-6 (Pillar 3 cut 1): a `LoopCheckpoint` round-trips through the
/// `SessionStore` — this is the data contract 书安OS will sit on to
/// migrate / replay a run on another node.
///
/// Cut 1 lands the data + persistence path. The actual in-loop
/// `persist_loop_checkpoint` call site is wired but exercising it
/// end-to-end needs a tool-using mock; the next cut will add that
/// integration coverage alongside the resume API.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn loop_checkpoint_round_trips_through_session_store() {
    use a3s_code_core::llm::TokenUsage;
    use a3s_code_core::loop_checkpoint::{LoopCheckpoint, LOOP_CHECKPOINT_SCHEMA_VERSION};
    use a3s_code_core::store::{MemorySessionStore, SessionStore};

    let store: std::sync::Arc<dyn SessionStore> = std::sync::Arc::new(MemorySessionStore::new());

    let run_id = "run-pillar3-roundtrip";
    let checkpoint = LoopCheckpoint {
        schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
        run_id: run_id.to_string(),
        session_id: "session-pillar3".to_string(),
        turn: 4,
        messages: vec![
            a3s_code_core::llm::Message::user("seed prompt"),
            a3s_code_core::llm::Message {
                role: "assistant".to_string(),
                content: vec![a3s_code_core::llm::ContentBlock::Text {
                    text: "ack".to_string(),
                }],
                reasoning_content: None,
            },
        ],
        total_usage: TokenUsage {
            prompt_tokens: 120,
            completion_tokens: 30,
            total_tokens: 150,
            cache_read_tokens: None,
            cache_write_tokens: None,
        },
        tool_calls_count: 3,
        verification_reports: Vec::new(),
        checkpoint_ms: 1_700_000_000_000,
    };

    store
        .save_loop_checkpoint(run_id, &checkpoint)
        .await
        .expect("save");

    let loaded = store
        .load_loop_checkpoint(run_id)
        .await
        .expect("load")
        .expect("checkpoint present");

    assert_eq!(loaded.run_id, run_id);
    assert_eq!(loaded.session_id, "session-pillar3");
    assert_eq!(loaded.turn, 4);
    assert_eq!(loaded.tool_calls_count, 3);
    assert_eq!(loaded.messages.len(), 2);
    assert_eq!(loaded.total_usage.total_tokens, 150);
    assert_eq!(loaded.schema_version, LOOP_CHECKPOINT_SCHEMA_VERSION);

    // Overwrite semantics: a second save for the same run_id replaces
    // the previous checkpoint (the loop only ever needs the latest).
    let mut newer = loaded.clone();
    newer.turn = 5;
    newer.tool_calls_count = 4;
    store
        .save_loop_checkpoint(run_id, &newer)
        .await
        .expect("save second");
    let again = store
        .load_loop_checkpoint(run_id)
        .await
        .expect("load again")
        .expect("checkpoint still present");
    assert_eq!(again.turn, 5);
    assert_eq!(again.tool_calls_count, 4);

    // Unknown run id -> None.
    let absent = store
        .load_loop_checkpoint("does-not-exist")
        .await
        .expect("load missing");
    assert!(absent.is_none());
}

/// IT-7 (Pillar 3 cut 1): a `send()` whose LLM response carries no
/// tool calls must **not** write a loop checkpoint — the loop exits
/// at the no-tool boundary, before the per-tool-round persist point.
/// This guards against checkpoint pollution from purely conversational
/// turns.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_without_tool_calls_does_not_emit_loop_checkpoint() {
    use a3s_code_core::store::{MemorySessionStore, SessionStore};

    let store_arc: std::sync::Arc<MemorySessionStore> =
        std::sync::Arc::new(MemorySessionStore::new());
    let store: std::sync::Arc<dyn SessionStore> = store_arc.clone();

    let agent = Agent::from_config(offline_test_config()).await.unwrap();
    let opts = SessionOptions::new()
        .with_session_id("pillar3-no-tool-call")
        .with_session_store(std::sync::Arc::clone(&store))
        .with_auto_save(true);
    let session = agent
        .session("/tmp/pillar3-no-tools", Some(opts))
        .expect("session");

    // Default session() routes through the real LLM (no mock client
    // injection here), so we can't actually call send(). Instead,
    // assert the *negative* property: with no run yet executed, no
    // checkpoint exists for any run id we choose to query.
    //
    // This also documents the contract for 书安OS-side tooling: a
    // session that hasn't completed a tool round has no checkpoint.
    let probe = store
        .load_loop_checkpoint("any-fake-run-id")
        .await
        .expect("probe");
    assert!(probe.is_none());

    // Sanity: the session is set up correctly and would persist on
    // tool rounds if the LLM emitted any.
    assert!(!session.is_closed());
}

/// IT-8 (Pillar 3 cut 2): `AgentSession::resume_run` fails fast with a
/// helpful error when there is no checkpoint for the given run id, and
/// with a different error when no `SessionStore` is configured at all.
/// These are the error paths 书安OS-side scheduling code needs to
/// distinguish to decide between "retry later" and "fall back to a
/// fresh session".
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resume_run_error_paths_are_distinguishable() {
    use a3s_code_core::store::MemorySessionStore;

    // Flavor A: no store on the session — resume_run must reject up
    // front with a message that names the missing capability.
    {
        let agent = Agent::from_config(offline_test_config()).await.unwrap();
        let session = agent
            .session("/tmp/it8-no-store", None)
            .expect("session no store");
        let err = session.resume_run("any-id").await.unwrap_err();
        let msg = err.to_string();
        assert!(
            msg.contains("session_store"),
            "expected store-missing error, got: {msg}"
        );
    }

    // Flavor B: store present but checkpoint absent — resume_run must
    // reject with a message that names the missing run id.
    {
        let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
            std::sync::Arc::new(MemorySessionStore::new());
        let agent = Agent::from_config(offline_test_config()).await.unwrap();
        let opts = SessionOptions::new()
            .with_session_id("it8-no-checkpoint")
            .with_session_store(std::sync::Arc::clone(&store));
        let session = agent
            .session("/tmp/it8-no-checkpoint", Some(opts))
            .expect("session with store");
        let err = session.resume_run("does-not-exist").await.unwrap_err();
        let msg = err.to_string();
        assert!(
            msg.contains("does-not-exist") && msg.contains("no loop checkpoint"),
            "expected checkpoint-missing error naming the run id, got: {msg}"
        );
    }
}