Skip to main content

zagens_core/engine/
turn_machine.rs

1//! TurnMachine trait + Effect enum — Phase 3b skeleton.
2//!
3//! This module defines the **pure-function state machine** interface that will
4//! replace the `TurnLoopHost` / `op_loop` command-pattern in Phase 3b. During
5//! Phase 3a / 3b-batch-1 it is **not yet wired into production**; the existing
6//! `handle_deepseek_turn` path continues to run. The shadow-mode comparison
7//! (batch 1 exit gate) feeds this interface.
8//!
9//! ## Layering
10//! - `TurnKernelProjection` — rebuilt solely from [`KernelEvent`] log; no IO.
11//! - `Effect` — the machine's only way to request IO from the host.
12//! - `TurnMachine::step` — pure function: same inputs → same outputs.
13//! - `KernelEventSink` — lightweight channel for double-write during Phase 3a/3b.
14
15use std::collections::HashSet;
16
17use tokio::sync::mpsc;
18
19use crate::chat::{ContentBlock, Message};
20use crate::compaction::CompactionArtifact;
21use crate::engine::kernel_event::{CapacityAction, KernelEvent, TurnOutcome};
22use crate::models::Usage;
23use crate::turn::{TurnLoopMode, TurnOutcomeStatus};
24
25// ── KernelEventSink ───────────────────────────────────────────────────────────
26
27/// Lightweight fire-and-forget sender used for Phase 3a double-write.
28///
29/// Backed by an `mpsc::UnboundedSender`; a background task (Phase 3b:
30/// `KernelEventLog` writer) drains it.  `send` never blocks.
31///
32/// Cloning is cheap (`Arc` under the hood).
33pub type KernelEventSink = mpsc::UnboundedSender<KernelEvent>;
34
35/// Emit a [`KernelEvent`] to an optional sink, ignoring send errors.
36///
37/// Used in `run.rs`, `streaming_phase.rs`, and `tool_phase.rs` at each
38/// observation point.  A `None` sink is a no-op (all non-L2 hosts).
39#[inline]
40pub fn emit_kernel(sink: Option<&KernelEventSink>, event: KernelEvent) {
41    if let Some(tx) = sink {
42        let _ = tx.send(event);
43    }
44}
45
46/// Emit through the optional sink **and** the host shadow accumulator (L2).
47#[inline]
48pub fn emit_kernel_event<H: crate::engine::kernel_turn_host::KernelTurnHost>(
49    host: &mut H,
50    event: KernelEvent,
51) {
52    host.record_kernel_event(&event);
53    emit_kernel(host.kernel_event_sink(), event);
54}
55
56// ── TurnKernelProjection ─────────────────────────────────────────────────────
57
58/// Snapshot of host state rebuildable purely from a `KernelEvent` log.
59///
60/// In Phase 3b this replaces the host-trait accessors (A-class fields in
61/// the Phase 3a completeness inventory). All fields must be derivable from
62/// the event log alone — see `kernel_event_completeness` tests.
63#[derive(Debug, Default, Clone)]
64pub struct TurnKernelProjection {
65    // ── Turn metadata ────────────────────────────────────────────────────────
66    pub turn_id: String,
67    pub mode: Option<TurnLoopMode>,
68    pub step_idx: u32,
69    pub max_steps: u32,
70
71    // ── Message counters (rebuilt from ModelMessage events) ──────────────────
72    pub model_message_count: u32,
73    pub total_usage: Usage,
74
75    // ── Tool catalog ─────────────────────────────────────────────────────────
76    /// Tools currently active (initial set + all `DeferredToolActivated` events).
77    pub active_tool_names: HashSet<String>,
78    /// Call-ids of all planned tool calls in the **current step**.
79    pub pending_call_ids: Vec<String>,
80
81    // ── Context flags ────────────────────────────────────────────────────────
82    pub scratchpad_summary_injected: bool,
83    pub scratchpad_reminder_count: u32,
84    pub compaction_artifact_count: u32,
85    pub cycle_briefing_count: u32,
86    /// Count of `SteerInjected` events seen this turn.
87    pub steer_injection_count: u32,
88
89    // ── Scratchpad step counters (reset at ModelRequestIssued) ───────────────
90    pub readonly_tool_successes: u32,
91    pub scratchpad_writes_this_step: u32,
92    /// Successful tool calls whose planned input carried path-like candidates (turn cumulative).
93    pub working_set_path_touch_count: u32,
94    /// Count of `MemoryPlaneQueried` events logged this turn.
95    pub memory_plane_query_count: u32,
96    /// Step index of the latest refresh `user_memory` query (0 = none this turn).
97    pub refresh_user_memory_query_step: u32,
98    /// Query keys already logged at the current step (cleared at `ModelRequestIssued`).
99    pub memory_plane_queried_keys_this_step: HashSet<String>,
100    /// Whether `ModelRequestIssued` was applied for the current step (log order anchor).
101    pub model_request_seen_this_step: bool,
102    /// Count of `TopicMemoryInjected` events logged this turn (episodic layer).
103    pub topic_memory_injection_count: u32,
104    /// Count of `LayeredContextSeamInjected` events logged this turn.
105    pub layered_context_seam_count: u32,
106
107    // ── Continuation counters ────────────────────────────────────────────────
108    pub step_limit_continuations: u32,
109    pub loop_guard_continuations: u32,
110    pub loop_guard_triggered_count: u32,
111    pub cycle_handoff_attempts: u32,
112    /// Clean in-turn cycle advances (`CycleAdvanced` events only).
113    pub in_turn_cycle_advances: u32,
114
115    // ── Capacity ─────────────────────────────────────────────────────────────
116    pub last_capacity_action: Option<CapacityAction>,
117    pub capacity_checkpoint_count: u32,
118
119    // ── Termination ─────────────────────────────────────────────────────────
120    pub outcome: Option<TurnOutcome>,
121}
122
123impl TurnKernelProjection {
124    /// Apply a single event and update the projection in place.
125    ///
126    /// This is the Phase 3b "projection function"; its correctness is covered
127    /// by the Phase 3a completeness tests in `kernel_event.rs`.
128    pub fn apply(&mut self, event: &KernelEvent) {
129        match event {
130            KernelEvent::TurnStarted {
131                turn_id,
132                mode,
133                max_steps,
134                ..
135            } => {
136                self.turn_id = turn_id.clone();
137                self.mode = Some(*mode);
138                self.max_steps = *max_steps;
139                self.step_idx = 0;
140            }
141
142            KernelEvent::TurnEnded { outcome, .. } => {
143                self.outcome = Some(outcome.clone());
144            }
145
146            KernelEvent::ModelRequestIssued { step_idx, .. } => {
147                // Reset per-step scratchpad counters.
148                self.step_idx = *step_idx;
149                self.readonly_tool_successes = 0;
150                self.scratchpad_writes_this_step = 0;
151                self.pending_call_ids.clear();
152                self.refresh_user_memory_query_step = 0;
153                self.memory_plane_queried_keys_this_step.clear();
154                self.model_request_seen_this_step = true;
155            }
156
157            KernelEvent::ModelMessage { usage, .. } => {
158                self.model_message_count += 1;
159                self.total_usage.input_tokens = self
160                    .total_usage
161                    .input_tokens
162                    .saturating_add(usage.input_tokens);
163                self.total_usage.output_tokens = self
164                    .total_usage
165                    .output_tokens
166                    .saturating_add(usage.output_tokens);
167            }
168
169            KernelEvent::ToolCallPlanned { call_id, .. } => {
170                self.pending_call_ids.push(call_id.clone());
171            }
172
173            KernelEvent::ToolCallFinished {
174                call_id,
175                tool_name,
176                outcome,
177                wrote_state,
178                ..
179            } => {
180                self.pending_call_ids.retain(|id| id != call_id);
181                if matches!(outcome, crate::engine::kernel_event::ToolOutcome::Success) {
182                    if *wrote_state && tool_name.starts_with("scratchpad_") {
183                        self.scratchpad_writes_this_step += 1;
184                    } else if !wrote_state {
185                        self.readonly_tool_successes += 1;
186                    }
187                }
188            }
189
190            KernelEvent::DeferredToolActivated { tool_name, .. } => {
191                self.active_tool_names.insert(tool_name.clone());
192            }
193
194            KernelEvent::ScratchpadSummaryInjected { .. } => {
195                self.scratchpad_summary_injected = true;
196            }
197
198            KernelEvent::ScratchpadReminderInjected { .. } => {
199                self.scratchpad_reminder_count += 1;
200            }
201
202            KernelEvent::CompactionArtifactCreated { .. } => {
203                self.compaction_artifact_count += 1;
204            }
205
206            KernelEvent::CycleBriefingInjected { .. } => {
207                self.cycle_briefing_count += 1;
208            }
209
210            KernelEvent::MemoryPlaneQueried {
211                query_key,
212                step_idx,
213                ..
214            } => {
215                self.memory_plane_query_count += 1;
216                self.memory_plane_queried_keys_this_step
217                    .insert(query_key.clone());
218                if query_key
219                    == crate::engine::turn_loop::memory_plane_query_policy::QUERY_USER_MEMORY
220                {
221                    self.refresh_user_memory_query_step = *step_idx;
222                }
223            }
224
225            KernelEvent::TopicMemoryInjected { .. } => {
226                self.topic_memory_injection_count += 1;
227            }
228
229            KernelEvent::LayeredContextSeamInjected { .. } => {
230                self.layered_context_seam_count += 1;
231            }
232
233            KernelEvent::SteerInjected { .. } => {
234                self.steer_injection_count += 1;
235            }
236
237            KernelEvent::LoopGuardTriggered { .. } => {
238                self.loop_guard_triggered_count += 1;
239            }
240
241            KernelEvent::CapacityCheckpoint { action, .. } => {
242                self.last_capacity_action = Some(action.clone());
243                self.capacity_checkpoint_count += 1;
244            }
245
246            KernelEvent::StepLimitContinuation { .. } => {
247                self.step_limit_continuations += 1;
248            }
249
250            KernelEvent::LoopGuardContinuation { .. } => {
251                self.loop_guard_continuations += 1;
252            }
253
254            KernelEvent::CycleAdvanced { .. } => {
255                self.in_turn_cycle_advances += 1;
256            }
257
258            KernelEvent::ContextOverflowRecovered {
259                strategy: crate::engine::kernel_event::OverflowStrategy::CycleHandoff,
260                ..
261            } => {
262                self.cycle_handoff_attempts += 1;
263            }
264
265            _ => {}
266        }
267    }
268
269    /// Rebuild a projection from a sequence of events (for replay / testing).
270    pub fn from_events(events: &[KernelEvent]) -> Self {
271        let mut p = Self::default();
272        let mut planned: std::collections::HashMap<String, (String, String)> =
273            std::collections::HashMap::new();
274        for ev in events {
275            if let KernelEvent::ToolCallPlanned {
276                call_id,
277                tool_name,
278                input_json,
279                ..
280            } = ev
281            {
282                planned.insert(call_id.clone(), (tool_name.clone(), input_json.clone()));
283            }
284            if let KernelEvent::ToolCallFinished {
285                call_id, outcome, ..
286            } = ev
287            {
288                crate::engine::turn_loop::memory_plane_working_policy::record_working_set_path_touch(
289                    &mut p, &planned, call_id, outcome,
290                );
291                planned.remove(call_id);
292            }
293            p.apply(ev);
294        }
295        p
296    }
297}
298
299// ── Live snapshot (shadow compare) ───────────────────────────────────────────
300
301/// Host-visible turn fields sampled at turn end for shadow projection diff.
302///
303/// Built by `run.rs` from `TurnContext` + loop-local counters + host flags.
304/// Compared against [`TurnKernelProjection`] rebuilt from the emitted event log.
305#[derive(Debug, Clone, PartialEq, Eq, Default)]
306pub struct LiveTurnSnapshot {
307    pub turn_id: String,
308    pub step_idx: u32,
309    pub max_steps: u32,
310    pub scratchpad_summary_injected: bool,
311    pub step_limit_continuations: u32,
312    pub loop_guard_continuations: u32,
313    pub cycle_handoff_attempts: u32,
314    pub in_turn_cycle_advances: u32,
315}
316
317/// Compare projection rebuilt from events against the live host snapshot.
318///
319/// Returns `None` when equivalent; otherwise a human-readable diff summary.
320#[must_use]
321pub fn compare_projection_to_live(
322    live: &LiveTurnSnapshot,
323    proj: &TurnKernelProjection,
324) -> Option<String> {
325    let mut diffs = Vec::new();
326    if live.turn_id != proj.turn_id {
327        diffs.push(format!(
328            "turn_id live={} proj={}",
329            live.turn_id, proj.turn_id
330        ));
331    }
332    if live.step_idx != proj.step_idx {
333        diffs.push(format!(
334            "step_idx live={} proj={}",
335            live.step_idx, proj.step_idx
336        ));
337    }
338    if live.max_steps != proj.max_steps {
339        diffs.push(format!(
340            "max_steps live={} proj={}",
341            live.max_steps, proj.max_steps
342        ));
343    }
344    if live.scratchpad_summary_injected != proj.scratchpad_summary_injected {
345        diffs.push(format!(
346            "scratchpad_summary_injected live={} proj={}",
347            live.scratchpad_summary_injected, proj.scratchpad_summary_injected
348        ));
349    }
350    if live.step_limit_continuations != proj.step_limit_continuations {
351        diffs.push(format!(
352            "step_limit_continuations live={} proj={}",
353            live.step_limit_continuations, proj.step_limit_continuations
354        ));
355    }
356    if live.loop_guard_continuations != proj.loop_guard_continuations {
357        diffs.push(format!(
358            "loop_guard_continuations live={} proj={}",
359            live.loop_guard_continuations, proj.loop_guard_continuations
360        ));
361    }
362    if live.cycle_handoff_attempts != proj.cycle_handoff_attempts {
363        diffs.push(format!(
364            "cycle_handoff_attempts live={} proj={}",
365            live.cycle_handoff_attempts, proj.cycle_handoff_attempts
366        ));
367    }
368    if live.in_turn_cycle_advances != proj.in_turn_cycle_advances {
369        diffs.push(format!(
370            "in_turn_cycle_advances live={} proj={}",
371            live.in_turn_cycle_advances, proj.in_turn_cycle_advances
372        ));
373    }
374    if diffs.is_empty() {
375        None
376    } else {
377        Some(diffs.join("; "))
378    }
379}
380
381// ── Effect ────────────────────────────────────────────────────────────────────
382
383/// IO intent emitted by `TurnMachine::step`.
384///
385/// The host's `EffectInterpreter` matches on this and performs the actual IO.
386/// In Phase 3b batch 2 the interpreter replaces `run_streaming_phase` /
387/// `run_tool_execution_phase` calls.
388#[derive(Debug, Clone, PartialEq, Eq)]
389#[non_exhaustive]
390pub enum Effect {
391    /// Issue an LLM request and stream the response.
392    CallModel { token_budget: u32 },
393    /// Execute a batch of approved tool calls (DAG-scheduled).
394    ExecuteBatch { call_ids: Vec<String> },
395    /// Request user approval for a planned tool call.
396    RequestApproval {
397        call_id: String,
398        description: String,
399    },
400    /// Inject a steer/system message into the session.
401    InjectSteer { text: String },
402    /// Trigger in-turn auto-compaction or capacity trim/handoff (replay anchor).
403    RunCompaction,
404    /// Produce layered context seam (`#159`) before model call (v3 pre-step).
405    RunLayeredContextCheckpoint,
406    /// Notify LSP after an edit-generating tool.
407    NotifyLsp { tool_name: String },
408    /// Sleep until a deadline (capacity back-off).
409    Sleep { millis: u64 },
410    /// Read from the memory plane before model context assembly (batch 4).
411    QueryMemory {
412        layer: crate::engine::turn_loop::memory_plane_query_policy::MemoryPlaneQueryLayer,
413        query_key: String,
414    },
415    /// Rebuild session system prompt from memory-plane reads (v3 refresh tail).
416    RefreshSystemPrompt,
417    /// Emit a memory-plane artifact (scratchpad snapshot / reminder; Phase D).
418    EmitArtifact {
419        kind: crate::engine::turn_loop::memory_artifact_policy::MemoryArtifactKind,
420        /// Optional log hint (e.g. scratchpad reminder `area_path` on replay).
421        area_hint: Option<String>,
422    },
423}
424
425// ── StepOutput ────────────────────────────────────────────────────────────────
426
427/// Output of a single `TurnMachine::step` call.
428#[derive(Debug, Default)]
429pub struct StepOutput {
430    /// Events emitted by this step (written to the KernelEvent log).
431    pub emitted_events: Vec<KernelEvent>,
432    /// IO effects the host must execute.
433    pub effects: Vec<Effect>,
434    /// If `Some`, the turn has ended.
435    pub halt: Option<TurnOutcome>,
436}
437
438impl StepOutput {
439    pub fn halt(outcome: TurnOutcome) -> Self {
440        Self {
441            halt: Some(outcome),
442            ..Default::default()
443        }
444    }
445}
446
447// ── TurnMachine trait ─────────────────────────────────────────────────────────
448
449/// Pure-function state machine for a single agent turn.
450///
451/// ### Invariants
452/// - `step` must not perform IO.
453/// - `step` must be deterministic given (`projection`, `event`).
454/// - All state visible to `step` must be in `projection` (rebuilt from the log).
455///
456/// Phase 3b wires this into an `EffectInterpreter` loop that replaces
457/// `handle_deepseek_turn`'s direct host calls.
458pub trait TurnMachine: Send + Sync {
459    fn step(&mut self, projection: &TurnKernelProjection, event: KernelEvent) -> StepOutput;
460}
461
462/// Pass-through replay machine: mirrors observed events into effects/halt.
463///
464/// Used in shadow mode to validate that the event log is sufficient to drive
465/// a pure state machine without consulting live host state.
466#[derive(Debug, Default)]
467pub struct ReplayTurnMachine;
468
469impl TurnMachine for ReplayTurnMachine {
470    fn step(&mut self, projection: &TurnKernelProjection, event: KernelEvent) -> StepOutput {
471        let mut out = StepOutput {
472            emitted_events: vec![event.clone()],
473            ..Default::default()
474        };
475        match &event {
476            KernelEvent::TurnEnded { outcome, .. } => {
477                out.halt = Some(outcome.clone());
478            }
479            KernelEvent::ModelRequestIssued { token_budget, .. } => {
480                for effect in
481                    crate::engine::turn_loop::memory_plane_query_policy::query_memory_effects_before_model_call(
482                        projection,
483                        None,
484                    )
485                {
486                    if let Effect::QueryMemory { query_key, .. } = &effect
487                        && projection
488                            .memory_plane_queried_keys_this_step
489                            .contains(query_key)
490                        {
491                            continue;
492                        }
493                    out.effects.push(effect);
494                }
495                out.effects.push(Effect::CallModel {
496                    token_budget: *token_budget,
497                });
498            }
499            KernelEvent::SteerInjected { text, .. } => {
500                out.effects.push(Effect::InjectSteer {
501                    text: text.to_string(),
502                });
503            }
504            KernelEvent::ScratchpadReminderInjected { .. }
505            | KernelEvent::ScratchpadSummaryInjected { .. } => {
506                out.effects.extend(
507                    crate::engine::turn_loop::memory_artifact_policy::memory_plane_emit_artifact_effects_from_events(std::slice::from_ref(&event)),
508                );
509            }
510            KernelEvent::CycleBriefingInjected { .. } => {
511                out.effects.push(Effect::InjectSteer {
512                    text: String::new(),
513                });
514            }
515            KernelEvent::ToolCallPlanned {
516                call_id,
517                tool_name,
518                decision,
519                ..
520            } => {
521                if decision.approval_required {
522                    out.effects.push(Effect::RequestApproval {
523                        call_id: call_id.clone(),
524                        description: tool_name.clone(),
525                    });
526                }
527                out.effects.push(Effect::ExecuteBatch {
528                    call_ids: vec![call_id.clone()],
529                });
530            }
531            KernelEvent::ToolCallFinished {
532                tool_name,
533                wrote_state,
534                ..
535            } => {
536                if *wrote_state && is_lsp_notify_tool(tool_name) {
537                    out.effects.push(Effect::NotifyLsp {
538                        tool_name: tool_name.clone(),
539                    });
540                }
541            }
542            KernelEvent::CapacityCheckpoint {
543                action: CapacityAction::Continue,
544                cooldown_blocked: true,
545                ..
546            } => {
547                out.effects.push(Effect::Sleep {
548                    millis: capacity_cooldown_backoff_millis(),
549                });
550            }
551            KernelEvent::CapacityCheckpoint { action, .. } => {
552                if matches!(action, CapacityAction::Trim | CapacityAction::Handoff) {
553                    out.effects.push(Effect::RunCompaction);
554                }
555            }
556            KernelEvent::CompactionArtifactCreated { .. } => {
557                out.effects.push(Effect::RunCompaction);
558            }
559            KernelEvent::StepLimitContinuation { .. }
560            | KernelEvent::LoopGuardContinuation { .. } => {
561                out.effects.push(Effect::InjectSteer {
562                    text: String::new(),
563                });
564            }
565            KernelEvent::LayeredContextSeamInjected { .. } => {
566                out.effects.push(Effect::RunLayeredContextCheckpoint);
567            }
568            KernelEvent::MemoryPlaneQueried {
569                layer,
570                query_key,
571                step_idx,
572                ..
573            } if *step_idx > projection.step_idx || !projection.model_request_seen_this_step => {
574                let layer =
575                    crate::engine::turn_loop::memory_plane_projection_policy::MemoryPlaneLayer::from_log_layer(
576                        layer,
577                    );
578                out.effects.push(Effect::QueryMemory {
579                    layer,
580                    query_key: query_key.clone(),
581                });
582                if query_key
583                    == crate::engine::turn_loop::memory_plane_episodic_policy::QUERY_TOPIC_EPISODIC
584                    && *step_idx == projection.refresh_user_memory_query_step
585                    && projection.refresh_user_memory_query_step > 0
586                {
587                    out.effects.push(Effect::RefreshSystemPrompt);
588                }
589            }
590            _ => {
591                let _ = projection;
592            }
593        }
594        out
595    }
596}
597
598/// Tools whose successful state write should trigger an LSP diagnostics flush.
599#[must_use]
600pub fn is_lsp_notify_tool(name: &str) -> bool {
601    matches!(
602        name,
603        "edit_file" | "write_file" | "apply_patch" | "multi_tool_use.parallel"
604    ) || name.starts_with("edit_")
605}
606
607/// Verify that [`ReplayTurnMachine`] effect counts match observed event counts.
608///
609/// Returns `None` when the replay chain is internally consistent; used by CI golden replay.
610#[must_use]
611pub fn verify_effect_replay_chain(events: &[KernelEvent]) -> Option<String> {
612    let mut machine = ReplayTurnMachine;
613    let mut projection = TurnKernelProjection::default();
614    let mut call_model_effects = 0u32;
615    let mut execute_batch_effects = 0u32;
616    let mut model_requests = 0u32;
617    let mut tool_planned = 0u32;
618    let mut halt: Option<TurnOutcome> = None;
619
620    for event in events {
621        match &event {
622            KernelEvent::ModelRequestIssued { .. } => model_requests += 1,
623            KernelEvent::ToolCallPlanned { .. } => tool_planned += 1,
624            _ => {}
625        }
626        let out = machine.step(&projection, event.clone());
627        projection.apply(event);
628        for effect in &out.effects {
629            match effect {
630                Effect::CallModel { .. } => call_model_effects += 1,
631                Effect::ExecuteBatch { .. } => execute_batch_effects += 1,
632                _ => {}
633            }
634        }
635        if let Some(h) = out.halt {
636            halt = Some(h);
637        }
638    }
639
640    let mut diffs = Vec::new();
641    if call_model_effects != model_requests {
642        diffs.push(format!(
643            "CallModel effects ({call_model_effects}) != ModelRequestIssued events ({model_requests})"
644        ));
645    }
646    if execute_batch_effects != tool_planned {
647        diffs.push(format!(
648            "ExecuteBatch effects ({execute_batch_effects}) != ToolCallPlanned events ({tool_planned})"
649        ));
650    }
651    if !events
652        .iter()
653        .any(|e| matches!(e, KernelEvent::TurnEnded { .. }))
654    {
655        diffs.push("missing TurnEnded event".into());
656    } else if halt.is_none() {
657        diffs.push("ReplayTurnMachine did not halt on TurnEnded".into());
658    }
659    if diffs.is_empty() {
660        None
661    } else {
662        Some(diffs.join("; "))
663    }
664}
665
666/// Verify LHT / guard continuation counters match the event log projection.
667#[must_use]
668pub fn verify_guard_projection_chain(events: &[KernelEvent]) -> Option<String> {
669    use crate::engine::turn_loop::guard_projection_policy::{
670        count_capacity_checkpoints, count_loop_guard_triggered, last_capacity_checkpoint_action,
671    };
672
673    let projection = TurnKernelProjection::from_events(events);
674    let step_limit = events
675        .iter()
676        .filter(|event| matches!(event, KernelEvent::StepLimitContinuation { .. }))
677        .count() as u32;
678    let loop_guard = events
679        .iter()
680        .filter(|event| matches!(event, KernelEvent::LoopGuardContinuation { .. }))
681        .count() as u32;
682    let loop_guard_triggered = count_loop_guard_triggered(events);
683    let cycle_handoffs = events
684        .iter()
685        .filter(|event| {
686            matches!(
687                event,
688                KernelEvent::ContextOverflowRecovered {
689                    strategy: crate::engine::kernel_event::OverflowStrategy::CycleHandoff,
690                    ..
691                }
692            )
693        })
694        .count() as u32;
695    let in_turn_cycle_advances = events
696        .iter()
697        .filter(|event| matches!(event, KernelEvent::CycleAdvanced { .. }))
698        .count() as u32;
699    let capacity_checkpoints = count_capacity_checkpoints(events);
700    let last_capacity = last_capacity_checkpoint_action(events);
701
702    let mut diffs = Vec::new();
703    if projection.step_limit_continuations != step_limit {
704        diffs.push(format!(
705            "step_limit_continuations proj={} events={step_limit}",
706            projection.step_limit_continuations
707        ));
708    }
709    if projection.loop_guard_continuations != loop_guard {
710        diffs.push(format!(
711            "loop_guard_continuations proj={} events={loop_guard}",
712            projection.loop_guard_continuations
713        ));
714    }
715    if projection.loop_guard_triggered_count != loop_guard_triggered {
716        diffs.push(format!(
717            "loop_guard_triggered_count proj={} events={loop_guard_triggered}",
718            projection.loop_guard_triggered_count
719        ));
720    }
721    if projection.cycle_handoff_attempts != cycle_handoffs {
722        diffs.push(format!(
723            "cycle_handoff_attempts proj={} events={cycle_handoffs}",
724            projection.cycle_handoff_attempts
725        ));
726    }
727    if projection.in_turn_cycle_advances != in_turn_cycle_advances {
728        diffs.push(format!(
729            "in_turn_cycle_advances proj={} events={in_turn_cycle_advances}",
730            projection.in_turn_cycle_advances
731        ));
732    }
733    if projection.capacity_checkpoint_count != capacity_checkpoints {
734        diffs.push(format!(
735            "capacity_checkpoint_count proj={} events={capacity_checkpoints}",
736            projection.capacity_checkpoint_count
737        ));
738    }
739    if capacity_checkpoints > 0 && projection.last_capacity_action.is_none() {
740        diffs.push(
741            "capacity checkpoints present but projection last_capacity_action is None".into(),
742        );
743    }
744    if let (Some(proj_last), Some(log_last)) = (
745        projection.last_capacity_action.as_ref(),
746        last_capacity.as_ref(),
747    ) && proj_last != log_last
748    {
749        diffs.push(format!(
750            "last_capacity_action proj={proj_last:?} log={log_last:?}"
751        ));
752    }
753    if diffs.is_empty() {
754        None
755    } else {
756        Some(diffs.join("; "))
757    }
758}
759
760/// Verify memory-plane event counters match the event log projection.
761#[must_use]
762pub fn verify_memory_projection_chain(events: &[KernelEvent]) -> Option<String> {
763    let projection = TurnKernelProjection::from_events(events);
764    let mut summary_injected = 0u32;
765    let mut reminder_injected = 0u32;
766    let mut compaction_artifacts = 0u32;
767    let mut cycle_briefings = 0u32;
768    let mut topic_memory_injections = 0u32;
769
770    for event in events {
771        match event {
772            KernelEvent::ScratchpadSummaryInjected { .. } => summary_injected += 1,
773            KernelEvent::ScratchpadReminderInjected { .. } => reminder_injected += 1,
774            KernelEvent::CompactionArtifactCreated { .. } => compaction_artifacts += 1,
775            KernelEvent::CycleBriefingInjected { .. } => cycle_briefings += 1,
776            KernelEvent::TopicMemoryInjected { .. } => topic_memory_injections += 1,
777            _ => {}
778        }
779    }
780
781    let mut diffs = Vec::new();
782    if projection.scratchpad_summary_injected != (summary_injected > 0) {
783        diffs.push(format!(
784            "scratchpad_summary_injected proj={} events={summary_injected}",
785            projection.scratchpad_summary_injected
786        ));
787    }
788    if projection.scratchpad_reminder_count != reminder_injected {
789        diffs.push(format!(
790            "scratchpad_reminder_count proj={} events={reminder_injected}",
791            projection.scratchpad_reminder_count
792        ));
793    }
794    if projection.compaction_artifact_count != compaction_artifacts {
795        diffs.push(format!(
796            "compaction_artifact_count proj={} events={compaction_artifacts}",
797            projection.compaction_artifact_count
798        ));
799    }
800    if projection.cycle_briefing_count != cycle_briefings {
801        diffs.push(format!(
802            "cycle_briefing_count proj={} events={cycle_briefings}",
803            projection.cycle_briefing_count
804        ));
805    }
806    if projection.topic_memory_injection_count != topic_memory_injections {
807        diffs.push(format!(
808            "topic_memory_injection_count proj={} events={topic_memory_injections}",
809            projection.topic_memory_injection_count
810        ));
811    }
812    if let Some(summary) =
813        crate::engine::turn_loop::memory_plane_projection_policy::verify_memory_plane_layer_coherence(
814            events,
815        )
816    {
817        diffs.push(format!("memory_plane_layers: {summary}"));
818    }
819    if let Some(summary) =
820        crate::engine::turn_loop::memory_plane_archival_policy::verify_archival_artifact_field_coherence(
821            events,
822        )
823    {
824        diffs.push(format!("archival_fields: {summary}"));
825    }
826    if let Some(summary) =
827        crate::engine::turn_loop::memory_plane_working_policy::verify_working_layer_tool_coherence(
828            events,
829        )
830    {
831        diffs.push(format!("working_layer_tools: {summary}"));
832    }
833    if let Some(summary) =
834        crate::engine::turn_loop::memory_plane_query_replay_policy::verify_memory_plane_query_projection_coherence(
835            events,
836        )
837    {
838        diffs.push(format!("memory_plane_queries: {summary}"));
839    }
840    if diffs.is_empty() {
841        None
842    } else {
843        Some(diffs.join("; "))
844    }
845}
846
847// ── Turn replay (Phase 3b batch 6 — resume foundation) ───────────────────────
848
849/// Projection rebuilt from a turn's event log — the Phase 3b resume substrate.
850#[derive(Debug, Clone)]
851pub struct TurnReplayReport {
852    pub event_count: usize,
853    pub projection: TurnKernelProjection,
854    pub outcome: Option<TurnOutcome>,
855}
856
857/// Rebuild [`TurnKernelProjection`] and outcome purely from an event sequence.
858#[must_use]
859pub fn replay_turn_projection(events: &[KernelEvent]) -> TurnReplayReport {
860    let projection = TurnKernelProjection::from_events(events);
861    TurnReplayReport {
862        event_count: events.len(),
863        outcome: projection.outcome.clone(),
864        projection,
865    }
866}
867
868/// Unified replay gate: projection/live parity + effect/guard/memory chains.
869///
870/// Returns `None` when the log is sufficient to drive resume/replay; used by
871/// Phase 3b replay shadow bake and future session resume.
872#[must_use]
873pub fn verify_turn_replay_coherence(
874    events: &[KernelEvent],
875    live: Option<&LiveTurnSnapshot>,
876) -> Option<String> {
877    let mut diffs = Vec::new();
878
879    if let Some(live) = live {
880        let projection = TurnKernelProjection::from_events(events);
881        if let Some(summary) = compare_projection_to_live(live, &projection) {
882            diffs.push(format!("live_projection: {summary}"));
883        }
884    }
885    if let Some(summary) = verify_effect_replay_chain(events) {
886        diffs.push(format!("effect: {summary}"));
887    }
888    if let Some(summary) = verify_guard_projection_chain(events) {
889        diffs.push(format!("guard: {summary}"));
890    }
891    let projection = TurnKernelProjection::from_events(events);
892    if let Some(mode) = projection.mode
893        && let Some(summary) =
894            crate::engine::turn_loop::outer_boundary_replay_policy::verify_outer_boundary_event_caps(
895                events, mode,
896            )
897    {
898        diffs.push(format!("outer_boundary_caps: {summary}"));
899    }
900    if let Some(summary) =
901        crate::engine::turn_loop::loop_guard_replay_policy::verify_loop_guard_replay_coherence(
902            events,
903        )
904    {
905        diffs.push(format!("loop_guard_replay: {summary}"));
906    }
907    if let Some(summary) =
908        crate::engine::turn_loop::capacity_replay_policy::verify_capacity_checkpoint_field_coherence(
909            events,
910        )
911    {
912        diffs.push(format!("capacity_fields: {summary}"));
913    }
914    if let Some(summary) = verify_capacity_effect_replay_coherence(events) {
915        diffs.push(format!("capacity_replay: {summary}"));
916    }
917    if let Some(summary) = verify_memory_projection_chain(events) {
918        diffs.push(format!("memory: {summary}"));
919    }
920    if let Some(summary) =
921        crate::engine::turn_loop::memory_plane_query_replay_policy::verify_memory_plane_query_replay_coherence(
922            events,
923        )
924    {
925        diffs.push(format!("memory_plane_query_replay: {summary}"));
926    }
927    if let Some(summary) =
928        crate::engine::turn_loop::layered_context_replay_policy::verify_layered_context_seam_replay_coherence(
929            events,
930        )
931    {
932        diffs.push(format!("layered_context_seam_replay: {summary}"));
933    }
934    if let Some(summary) =
935        crate::engine::turn_loop::system_prompt_refresh_replay_policy::verify_system_prompt_refresh_replay_coherence(
936            events,
937        )
938    {
939        diffs.push(format!("system_prompt_refresh_replay: {summary}"));
940    }
941    if !events
942        .iter()
943        .any(|e| matches!(e, KernelEvent::TurnEnded { .. }))
944    {
945        diffs.push("missing TurnEnded event".into());
946    }
947
948    if diffs.is_empty() {
949        None
950    } else {
951        Some(diffs.join("; "))
952    }
953}
954
955/// Per-turn replay summary for thread-level aggregation (Phase 3b batch 6c).
956#[derive(Debug, Clone, PartialEq, Eq)]
957pub struct ThreadTurnReplaySummary {
958    pub turn_id: String,
959    pub event_count: usize,
960    pub coherence_ok: bool,
961    pub coherence_error: Option<String>,
962    pub outcome: Option<TurnOutcome>,
963}
964
965/// Thread-level replay report built from persisted turn event logs.
966#[derive(Debug, Clone, PartialEq, Eq)]
967pub struct ThreadReplayReport {
968    pub thread_id: String,
969    pub turn_count: usize,
970    pub turns_with_events: usize,
971    pub turns_coherent: usize,
972    pub all_coherent: bool,
973    pub turns: Vec<ThreadTurnReplaySummary>,
974}
975
976/// Build a thread replay report from `(turn_id, events)` pairs.
977///
978/// Turns with empty event logs are omitted; coherence is evaluated only when
979/// events are present.
980#[must_use]
981pub fn build_thread_replay_report(
982    thread_id: &str,
983    turn_events: &[(String, Vec<KernelEvent>)],
984) -> ThreadReplayReport {
985    let mut turns = Vec::new();
986    let mut turns_with_events = 0usize;
987    let mut turns_coherent = 0usize;
988
989    for (turn_id, events) in turn_events {
990        if events.is_empty() {
991            continue;
992        }
993        turns_with_events += 1;
994        let report = replay_turn_projection(events);
995        let coherence_error = verify_turn_replay_coherence(events, None);
996        let coherence_ok = coherence_error.is_none();
997        if coherence_ok {
998            turns_coherent += 1;
999        }
1000        turns.push(ThreadTurnReplaySummary {
1001            turn_id: turn_id.clone(),
1002            event_count: report.event_count,
1003            coherence_ok,
1004            coherence_error,
1005            outcome: report.outcome,
1006        });
1007    }
1008
1009    ThreadReplayReport {
1010        thread_id: thread_id.to_string(),
1011        turn_count: turn_events.len(),
1012        turns_with_events,
1013        turns_coherent,
1014        all_coherent: turns_with_events > 0 && turns_coherent == turns_with_events,
1015        turns,
1016    }
1017}
1018
1019/// Thread-level replay substrate: per-turn coherence report plus latest turn projection.
1020#[derive(Debug, Clone)]
1021pub struct ThreadReplayProjection {
1022    pub report: ThreadReplayReport,
1023    pub latest_turn_id: Option<String>,
1024    pub latest_projection: TurnKernelProjection,
1025    pub message_stats: ThreadMessageReplayStats,
1026    /// Per-step model message anchors from the log (block counts only; text stays in session store).
1027    pub message_timeline: Vec<ThreadMessageTimelineEntry>,
1028    /// Aggregated message-plane index (counts only; session depth estimates).
1029    pub message_plane_index: ThreadMessagePlaneIndex,
1030    /// Preview-level transcript index (5c; no full message bodies).
1031    pub transcript_preview_index:
1032        crate::engine::turn_loop::message_body_rebuild_policy::ThreadTranscriptPreviewIndex,
1033    /// Compaction artifact anchors from kernel logs (replaced_range metadata only).
1034    pub compaction_timeline: Vec<ThreadCompactionReplayEntry>,
1035    pub compaction_index: ThreadCompactionReplayIndex,
1036    /// Continuation steps replay `InjectSteer` in their step effect chain.
1037    pub continuation_anchor_ok: bool,
1038    pub continuation_anchor_summary: Option<String>,
1039    pub request_approval_anchor_ok: bool,
1040    pub request_approval_anchor_summary: Option<String>,
1041    pub notify_lsp_anchor_ok: bool,
1042    pub notify_lsp_anchor_summary: Option<String>,
1043    /// Scratchpad / reminder / cycle-briefing events replay `InjectSteer` in the effect chain.
1044    pub memory_plane_replay_anchor_ok: bool,
1045    pub memory_plane_replay_anchor_summary: Option<String>,
1046    /// Compaction artifact / trim-handoff events replay `RunCompaction` in the effect chain.
1047    pub compaction_replay_anchor_ok: bool,
1048    pub compaction_replay_anchor_summary: Option<String>,
1049    /// Aggregated replay-chain effect counts for the thread (v3 observability).
1050    pub effect_counts: ReplayEffectCounts,
1051}
1052
1053/// One compaction artifact anchor rebuildable from kernel logs.
1054#[derive(Debug, Clone, PartialEq, Eq)]
1055pub struct ThreadCompactionReplayEntry {
1056    pub turn_id: String,
1057    pub artifact_id: String,
1058    /// Inclusive replaced message index range (`from`..=`to`).
1059    pub replaced_from: u32,
1060    pub replaced_to: u32,
1061    pub messages_removed_count: u32,
1062    pub summary_token_count: u32,
1063}
1064
1065/// Aggregated compaction metadata rebuildable from kernel event logs.
1066#[derive(Debug, Clone, PartialEq, Eq, Default)]
1067pub struct ThreadCompactionReplayIndex {
1068    pub artifact_count: u32,
1069    pub messages_removed_estimate: u32,
1070    /// Lower-bound session depth at compaction time (`max(replaced_to) + 1`).
1071    pub peak_session_depth_hint: u32,
1072}
1073
1074/// Session-store compaction row index (half-open `[start, end)` ranges).
1075#[derive(Debug, Clone, PartialEq, Eq)]
1076pub struct SessionCompactionArtifactEntry {
1077    pub artifact_id: String,
1078    pub replaced_start: u32,
1079    pub replaced_end: u32,
1080    pub messages_removed_count: u32,
1081    pub summary_token_count: u32,
1082}
1083
1084/// Build session compaction index rows from persisted SQLite artifacts.
1085#[must_use]
1086pub fn build_session_compaction_artifact_index(
1087    artifacts: &[CompactionArtifact],
1088) -> Vec<SessionCompactionArtifactEntry> {
1089    artifacts
1090        .iter()
1091        .map(|artifact| SessionCompactionArtifactEntry {
1092            artifact_id: artifact.id.clone(),
1093            replaced_start: artifact.replaced_start as u32,
1094            replaced_end: artifact.replaced_end as u32,
1095            messages_removed_count: artifact.replaced_count() as u32,
1096            summary_token_count: artifact.summary_tokens,
1097        })
1098        .collect()
1099}
1100
1101/// Verify kernel log compaction anchors match session-store artifact metadata.
1102#[must_use]
1103pub fn verify_compaction_artifacts_vs_kernel_timeline(
1104    kernel: &[ThreadCompactionReplayEntry],
1105    session: &[SessionCompactionArtifactEntry],
1106) -> Option<String> {
1107    if kernel.is_empty() && session.is_empty() {
1108        return None;
1109    }
1110    if kernel.len() != session.len() {
1111        return Some(format!(
1112            "kernel compaction events ({}) != session artifacts ({})",
1113            kernel.len(),
1114            session.len()
1115        ));
1116    }
1117    let mut issues = Vec::new();
1118    for (k, s) in kernel.iter().zip(session.iter()) {
1119        if k.artifact_id != s.artifact_id {
1120            issues.push(format!(
1121                "artifact id mismatch kernel={} session={}",
1122                k.artifact_id, s.artifact_id
1123            ));
1124        }
1125        if k.replaced_from != s.replaced_start {
1126            issues.push(format!(
1127                "artifact {} replaced_start kernel={} session={}",
1128                k.artifact_id, k.replaced_from, s.replaced_start
1129            ));
1130        }
1131        let kernel_end_exclusive = k.replaced_to.saturating_add(1);
1132        if kernel_end_exclusive != s.replaced_end {
1133            issues.push(format!(
1134                "artifact {} replaced_end kernel={} session={}",
1135                k.artifact_id, kernel_end_exclusive, s.replaced_end
1136            ));
1137        }
1138        if k.messages_removed_count != s.messages_removed_count {
1139            issues.push(format!(
1140                "artifact {} removed_count kernel={} session={}",
1141                k.artifact_id, k.messages_removed_count, s.messages_removed_count
1142            ));
1143        }
1144        if k.summary_token_count != s.summary_token_count {
1145            issues.push(format!(
1146                "artifact {} summary_tokens kernel={} session={}",
1147                k.artifact_id, k.summary_token_count, s.summary_token_count
1148            ));
1149        }
1150    }
1151    if issues.is_empty() {
1152        None
1153    } else {
1154        Some(issues.join("; "))
1155    }
1156}
1157
1158/// One `ModelMessage` anchor rebuildable from kernel logs (no message body).
1159#[derive(Debug, Clone, PartialEq, Eq)]
1160pub struct ThreadMessageTimelineEntry {
1161    pub turn_id: String,
1162    pub step_idx: u32,
1163    pub block_count: u32,
1164}
1165
1166/// Structured session JSON vs kernel log message coverage (observability).
1167#[derive(Debug, Clone, PartialEq, Eq)]
1168pub struct SessionMessageCoverage {
1169    pub session_message_count: usize,
1170    pub kernel_model_message_count: u32,
1171    pub coverage_ok: bool,
1172    pub summary: Option<String>,
1173}
1174
1175/// Aggregated message-plane counters rebuildable from kernel event logs (not full text).
1176#[derive(Debug, Clone, PartialEq, Eq, Default)]
1177pub struct ThreadMessageReplayStats {
1178    pub turns_with_events: usize,
1179    pub model_request_count: u32,
1180    pub model_message_count: u32,
1181    pub tool_call_planned_count: u32,
1182    pub steer_injection_count: u32,
1183    pub compaction_artifact_count: u32,
1184    pub scratchpad_summary_count: u32,
1185    pub scratchpad_reminder_count: u32,
1186    pub cycle_briefing_count: u32,
1187    pub step_limit_continuation_count: u32,
1188    pub loop_guard_continuation_count: u32,
1189    pub layered_context_seam_count: u32,
1190}
1191
1192/// Message-plane index rebuildable from kernel logs (no message bodies).
1193#[derive(Debug, Clone, PartialEq, Eq, Default)]
1194pub struct ThreadMessagePlaneIndex {
1195    pub model_request_count: u32,
1196    pub model_message_count: u32,
1197    pub tool_call_planned_count: u32,
1198    pub steer_injection_count: u32,
1199    /// Lower-bound session rows: model assistant messages + tool result messages.
1200    pub estimated_min_session_messages: u32,
1201}
1202
1203/// Build a message-plane index from aggregated thread stats.
1204#[must_use]
1205pub fn replay_thread_message_plane_index(
1206    stats: &ThreadMessageReplayStats,
1207) -> ThreadMessagePlaneIndex {
1208    ThreadMessagePlaneIndex {
1209        model_request_count: stats.model_request_count,
1210        model_message_count: stats.model_message_count,
1211        tool_call_planned_count: stats.tool_call_planned_count,
1212        steer_injection_count: stats.steer_injection_count,
1213        estimated_min_session_messages: stats
1214            .model_message_count
1215            .saturating_add(stats.tool_call_planned_count)
1216            .saturating_add(stats.layered_context_seam_count),
1217    }
1218}
1219
1220/// Role-indexed session message counts (observability; no body rebuild).
1221#[derive(Debug, Clone, PartialEq, Eq, Default)]
1222pub struct SessionMessageRoleIndex {
1223    pub user_message_count: u32,
1224    pub assistant_message_count: u32,
1225    pub tool_result_message_count: u32,
1226    /// User rows with text blocks and no `tool_result` (steer, scratchpad injects, initial prompt).
1227    pub text_user_message_count: u32,
1228    pub total_message_count: u32,
1229}
1230
1231/// Kernel-log lower bounds for memory-plane user injections (steer / scratchpad).
1232#[derive(Debug, Clone, PartialEq, Eq, Default)]
1233pub struct KernelMemoryPlaneUserEstimate {
1234    pub min_steer_user_messages: u32,
1235    pub min_scratchpad_summary_user_messages: u32,
1236    pub min_scratchpad_reminder_user_messages: u32,
1237    pub min_continuation_user_messages: u32,
1238    pub min_memory_injected_user_messages: u32,
1239}
1240
1241/// Kernel-log lower bounds for role-indexed session rows (assistant + tool results).
1242#[derive(Debug, Clone, PartialEq, Eq, Default)]
1243pub struct KernelMessageRoleEstimate {
1244    pub min_assistant_messages: u32,
1245    pub min_tool_result_messages: u32,
1246    pub min_steer_user_messages: u32,
1247}
1248
1249/// Build role-indexed counts from session JSON messages.
1250#[must_use]
1251pub fn build_session_message_role_index(messages: &[Message]) -> SessionMessageRoleIndex {
1252    let mut user_message_count = 0u32;
1253    let mut assistant_message_count = 0u32;
1254    let mut tool_result_message_count = 0u32;
1255    let mut text_user_message_count = 0u32;
1256    for msg in messages {
1257        match msg.role.as_str() {
1258            "user" => {
1259                user_message_count += 1;
1260                if message_has_tool_result(msg) {
1261                    tool_result_message_count += 1;
1262                } else if message_has_text_block(msg) {
1263                    text_user_message_count += 1;
1264                }
1265            }
1266            "assistant" => assistant_message_count += 1,
1267            _ => {}
1268        }
1269    }
1270    SessionMessageRoleIndex {
1271        user_message_count,
1272        assistant_message_count,
1273        tool_result_message_count,
1274        text_user_message_count,
1275        total_message_count: messages.len() as u32,
1276    }
1277}
1278
1279fn message_has_text_block(msg: &Message) -> bool {
1280    msg.content
1281        .iter()
1282        .any(|block| matches!(block, ContentBlock::Text { .. }))
1283}
1284
1285fn message_has_tool_result(msg: &Message) -> bool {
1286    msg.content
1287        .iter()
1288        .any(|block| matches!(block, ContentBlock::ToolResult { .. }))
1289}
1290
1291/// Build kernel-log role lower bounds from aggregated thread stats.
1292#[must_use]
1293pub fn replay_kernel_message_role_estimate(
1294    stats: &ThreadMessageReplayStats,
1295) -> KernelMessageRoleEstimate {
1296    KernelMessageRoleEstimate {
1297        min_assistant_messages: stats
1298            .model_message_count
1299            .saturating_add(stats.layered_context_seam_count),
1300        min_tool_result_messages: stats.tool_call_planned_count,
1301        min_steer_user_messages: stats.steer_injection_count,
1302    }
1303}
1304
1305/// Build kernel-log lower bounds for memory-plane user injections.
1306#[must_use]
1307pub fn replay_kernel_memory_plane_user_estimate(
1308    stats: &ThreadMessageReplayStats,
1309) -> KernelMemoryPlaneUserEstimate {
1310    let min_steer_user_messages = stats.steer_injection_count;
1311    let min_scratchpad_summary_user_messages = stats.scratchpad_summary_count;
1312    let min_scratchpad_reminder_user_messages = stats.scratchpad_reminder_count;
1313    let min_continuation_user_messages = stats
1314        .step_limit_continuation_count
1315        .saturating_add(stats.loop_guard_continuation_count);
1316    let min_memory_injected_user_messages = min_steer_user_messages
1317        .saturating_add(min_scratchpad_summary_user_messages)
1318        .saturating_add(min_scratchpad_reminder_user_messages)
1319        .saturating_add(min_continuation_user_messages);
1320    KernelMemoryPlaneUserEstimate {
1321        min_steer_user_messages,
1322        min_scratchpad_summary_user_messages,
1323        min_scratchpad_reminder_user_messages,
1324        min_continuation_user_messages,
1325        min_memory_injected_user_messages,
1326    }
1327}
1328
1329/// Weak check: session text-user rows can host kernel memory-plane injections.
1330#[must_use]
1331pub fn verify_session_memory_plane_user_depth(
1332    session: &SessionMessageRoleIndex,
1333    kernel: &KernelMemoryPlaneUserEstimate,
1334) -> Option<String> {
1335    if kernel.min_memory_injected_user_messages == 0 {
1336        return None;
1337    }
1338    if session.text_user_message_count >= kernel.min_memory_injected_user_messages {
1339        None
1340    } else {
1341        Some(format!(
1342            "session text user messages ({}) below kernel memory-plane injections ({})",
1343            session.text_user_message_count, kernel.min_memory_injected_user_messages
1344        ))
1345    }
1346}
1347
1348/// Verify session role counts can host kernel-log assistant + tool-result rows.
1349#[must_use]
1350pub fn verify_session_role_index(
1351    session: &SessionMessageRoleIndex,
1352    kernel: &KernelMessageRoleEstimate,
1353) -> Option<String> {
1354    let mut issues = Vec::new();
1355    if session.assistant_message_count < kernel.min_assistant_messages {
1356        issues.push(format!(
1357            "session assistant messages ({}) below kernel model_message events ({})",
1358            session.assistant_message_count, kernel.min_assistant_messages
1359        ));
1360    }
1361    if session.tool_result_message_count < kernel.min_tool_result_messages {
1362        issues.push(format!(
1363            "session tool_result messages ({}) below kernel tool_call_planned events ({})",
1364            session.tool_result_message_count, kernel.min_tool_result_messages
1365        ));
1366    }
1367    if issues.is_empty() {
1368        None
1369    } else {
1370        Some(issues.join("; "))
1371    }
1372}
1373
1374/// Count inclusive-range messages removed by a compaction artifact.
1375#[must_use]
1376pub fn compaction_messages_removed_count(replaced_from: u32, replaced_to: u32) -> u32 {
1377    if replaced_to >= replaced_from {
1378        replaced_to - replaced_from + 1
1379    } else {
1380        0
1381    }
1382}
1383
1384/// Rebuild compaction artifact anchors across a thread from persisted kernel events.
1385#[must_use]
1386pub fn replay_thread_compaction_timeline(
1387    turn_events: &[(String, Vec<KernelEvent>)],
1388) -> Vec<ThreadCompactionReplayEntry> {
1389    let mut timeline = Vec::new();
1390    for (_, events) in turn_events {
1391        for event in events {
1392            if let KernelEvent::CompactionArtifactCreated {
1393                turn_id,
1394                artifact_id,
1395                replaced_range,
1396                summary_token_count,
1397            } = event
1398            {
1399                let messages_removed_count =
1400                    compaction_messages_removed_count(replaced_range.from, replaced_range.to);
1401                timeline.push(ThreadCompactionReplayEntry {
1402                    turn_id: turn_id.clone(),
1403                    artifact_id: artifact_id.clone(),
1404                    replaced_from: replaced_range.from,
1405                    replaced_to: replaced_range.to,
1406                    messages_removed_count,
1407                    summary_token_count: *summary_token_count,
1408                });
1409            }
1410        }
1411    }
1412    timeline
1413}
1414
1415/// Build aggregated compaction index from compaction timeline anchors.
1416#[must_use]
1417pub fn replay_thread_compaction_index(
1418    timeline: &[ThreadCompactionReplayEntry],
1419) -> ThreadCompactionReplayIndex {
1420    let mut messages_removed_estimate = 0u32;
1421    let mut peak_session_depth_hint = 0u32;
1422    for entry in timeline {
1423        messages_removed_estimate =
1424            messages_removed_estimate.saturating_add(entry.messages_removed_count);
1425        peak_session_depth_hint = peak_session_depth_hint.max(entry.replaced_to.saturating_add(1));
1426    }
1427    ThreadCompactionReplayIndex {
1428        artifact_count: timeline.len() as u32,
1429        messages_removed_estimate,
1430        peak_session_depth_hint,
1431    }
1432}
1433
1434/// Weak check: current session + compaction-removed rows cover kernel plane estimate.
1435#[must_use]
1436pub fn verify_session_compaction_depth(
1437    session_message_count: usize,
1438    compaction: &ThreadCompactionReplayIndex,
1439    plane_index: &ThreadMessagePlaneIndex,
1440) -> Option<String> {
1441    if compaction.artifact_count == 0 {
1442        return None;
1443    }
1444    let restored = session_message_count as u32 + compaction.messages_removed_estimate;
1445    let min_needed = plane_index.estimated_min_session_messages;
1446    if restored < min_needed {
1447        Some(format!(
1448            "session ({session_message_count}) + compaction removed ({removed}) = {restored} below kernel plane estimate ({min_needed})",
1449            removed = compaction.messages_removed_estimate
1450        ))
1451    } else {
1452        None
1453    }
1454}
1455
1456/// Verify session JSON depth can host the kernel log message-plane estimate.
1457#[must_use]
1458pub fn verify_session_message_plane_depth(
1459    session_message_count: usize,
1460    index: &ThreadMessagePlaneIndex,
1461) -> Option<String> {
1462    let min = index.estimated_min_session_messages as usize;
1463    if session_message_count < min {
1464        Some(format!(
1465            "session messages ({session_message_count}) below kernel plane estimate ({min})"
1466        ))
1467    } else {
1468        None
1469    }
1470}
1471
1472/// Verify a step slice has exactly one `ModelMessage` when a model request was issued.
1473#[must_use]
1474pub fn verify_step_model_message_anchor(
1475    turn_events: &[KernelEvent],
1476    step_idx: u32,
1477) -> Option<String> {
1478    let step_events = events_for_step(turn_events, step_idx);
1479    if step_events.is_empty() {
1480        return None;
1481    }
1482    let has_request = step_events.iter().any(
1483        |e| matches!(e, KernelEvent::ModelRequestIssued { step_idx: s, .. } if *s == step_idx),
1484    );
1485    let message_count = step_events
1486        .iter()
1487        .filter(|e| matches!(e, KernelEvent::ModelMessage { step_idx: s, .. } if *s == step_idx))
1488        .count();
1489    if has_request && message_count == 0 {
1490        return Some(format!(
1491            "step {step_idx} has ModelRequestIssued but no ModelMessage"
1492        ));
1493    }
1494    if has_request && message_count > 1 {
1495        return Some(format!(
1496            "step {step_idx} has {message_count} ModelMessage events (expected 1)"
1497        ));
1498    }
1499    None
1500}
1501
1502/// Verify continuation steps replay an `InjectSteer` effect (event-driven v3 substrate).
1503#[must_use]
1504pub fn verify_step_continuation_anchor(
1505    turn_events: &[KernelEvent],
1506    step_idx: u32,
1507) -> Option<String> {
1508    let step_events = events_for_step(turn_events, step_idx);
1509    let has_continuation = step_events.iter().any(|event| {
1510        matches!(
1511            event,
1512            KernelEvent::StepLimitContinuation { .. } | KernelEvent::LoopGuardContinuation { .. }
1513        )
1514    });
1515    if !has_continuation {
1516        return None;
1517    }
1518    let effects = replay_step_effects(turn_events, step_idx);
1519    if effects
1520        .iter()
1521        .any(|effect| matches!(effect, Effect::InjectSteer { .. }))
1522    {
1523        None
1524    } else {
1525        Some(format!(
1526            "step {step_idx} has continuation events but no InjectSteer in step replay chain"
1527        ))
1528    }
1529}
1530
1531/// Verify continuation steps across all turns replay `InjectSteer` in their step effect chain.
1532#[must_use]
1533pub fn verify_thread_continuation_anchors(
1534    turn_events: &[(String, Vec<KernelEvent>)],
1535) -> Option<String> {
1536    let mut issues = Vec::new();
1537    for (_, events) in turn_events {
1538        let mut continuation_steps = std::collections::BTreeSet::new();
1539        for event in events {
1540            match event {
1541                KernelEvent::StepLimitContinuation { step_idx, .. }
1542                | KernelEvent::LoopGuardContinuation { step_idx, .. } => {
1543                    continuation_steps.insert(*step_idx);
1544                }
1545                _ => {}
1546            }
1547        }
1548        for step_idx in continuation_steps {
1549            if let Some(summary) = verify_step_continuation_anchor(events, step_idx) {
1550                issues.push(summary);
1551            }
1552        }
1553    }
1554    if issues.is_empty() {
1555        None
1556    } else {
1557        Some(issues.join("; "))
1558    }
1559}
1560
1561/// Whether a planned tool call requires user approval (from kernel log policy metadata).
1562#[must_use]
1563pub fn is_approval_required_planned_event(event: &KernelEvent) -> bool {
1564    matches!(
1565        event,
1566        KernelEvent::ToolCallPlanned {
1567            decision,
1568            ..
1569        } if decision.approval_required
1570    )
1571}
1572
1573/// Verify steps replay a `RequestApproval` effect per approval-required `ToolCallPlanned`.
1574#[must_use]
1575pub fn verify_step_request_approval_anchor(
1576    turn_events: &[KernelEvent],
1577    step_idx: u32,
1578) -> Option<String> {
1579    let step_events = events_for_step(turn_events, step_idx);
1580    let expected = step_events
1581        .iter()
1582        .filter(|event| is_approval_required_planned_event(event))
1583        .count();
1584    if expected == 0 {
1585        return None;
1586    }
1587    let approval_effects = replay_step_effects(turn_events, step_idx)
1588        .iter()
1589        .filter(|effect| matches!(effect, Effect::RequestApproval { .. }))
1590        .count();
1591    if approval_effects >= expected {
1592        None
1593    } else {
1594        Some(format!(
1595            "step {step_idx} expected {expected} RequestApproval replay effects, found {approval_effects}"
1596        ))
1597    }
1598}
1599
1600/// Verify edit-tool steps replay a `NotifyLsp` effect per state-writing tool finish.
1601#[must_use]
1602pub fn verify_step_notify_lsp_anchor(turn_events: &[KernelEvent], step_idx: u32) -> Option<String> {
1603    let step_events = events_for_step(turn_events, step_idx);
1604    let expected = step_events
1605        .iter()
1606        .filter(|event| {
1607            matches!(
1608                event,
1609                KernelEvent::ToolCallFinished {
1610                    tool_name,
1611                    wrote_state: true,
1612                    ..
1613                } if is_lsp_notify_tool(tool_name)
1614            )
1615        })
1616        .count();
1617    if expected == 0 {
1618        return None;
1619    }
1620    let notify_effects = replay_step_effects(turn_events, step_idx)
1621        .iter()
1622        .filter(|effect| matches!(effect, Effect::NotifyLsp { .. }))
1623        .count();
1624    if notify_effects >= expected {
1625        None
1626    } else {
1627        Some(format!(
1628            "step {step_idx} expected {expected} NotifyLsp replay effects, found {notify_effects}"
1629        ))
1630    }
1631}
1632
1633/// Kernel events that inject memory-plane user rows (symbol anchors; body stays in session JSON).
1634#[must_use]
1635pub fn is_memory_plane_injection_kernel_event(event: &KernelEvent) -> bool {
1636    matches!(
1637        event,
1638        KernelEvent::ScratchpadReminderInjected { .. }
1639            | KernelEvent::ScratchpadSummaryInjected { .. }
1640            | KernelEvent::CycleBriefingInjected { .. }
1641    )
1642}
1643
1644/// Memory-plane replay effects (`EmitArtifact` for scratchpad; `InjectSteer` anchor for cycle briefing).
1645#[must_use]
1646pub fn memory_plane_inject_steer_effects_from_events(events: &[KernelEvent]) -> Vec<Effect> {
1647    let mut out = crate::engine::turn_loop::memory_artifact_policy::memory_plane_emit_artifact_effects_from_events(events);
1648    out.extend(
1649        events
1650            .iter()
1651            .filter(|event| matches!(event, KernelEvent::CycleBriefingInjected { .. }))
1652            .map(|_| Effect::InjectSteer {
1653                text: String::new(),
1654            }),
1655    );
1656    out
1657}
1658
1659/// Whether an effect replays a memory-plane injection anchor.
1660#[must_use]
1661pub fn is_memory_plane_replay_effect(effect: &Effect) -> bool {
1662    matches!(
1663        effect,
1664        Effect::EmitArtifact { .. } | Effect::InjectSteer { .. }
1665    )
1666}
1667
1668/// Count memory-plane replay effects emitted by [`ReplayTurnMachine`] for observed events.
1669fn count_memory_plane_replay_inject_effects(events: &[KernelEvent]) -> usize {
1670    let mut machine = ReplayTurnMachine;
1671    let mut projection = TurnKernelProjection::default();
1672    let mut count = 0;
1673    for event in events {
1674        let is_memory_plane = is_memory_plane_injection_kernel_event(event);
1675        let out = machine.step(&projection, event.clone());
1676        projection.apply(event);
1677        if is_memory_plane {
1678            count += out
1679                .effects
1680                .iter()
1681                .filter(|effect| is_memory_plane_replay_effect(effect))
1682                .count();
1683        }
1684    }
1685    count
1686}
1687
1688/// Verify scratchpad / reminder / cycle-briefing events replay memory-plane effects.
1689#[must_use]
1690pub fn verify_thread_memory_plane_replay_anchors(
1691    turn_events: &[(String, Vec<KernelEvent>)],
1692) -> Option<String> {
1693    let mut issues = Vec::new();
1694    for (turn_id, events) in turn_events {
1695        let expected = events
1696            .iter()
1697            .filter(|event| is_memory_plane_injection_kernel_event(event))
1698            .count();
1699        if expected == 0 {
1700            continue;
1701        }
1702        let replayed = count_memory_plane_replay_inject_effects(events);
1703        if replayed < expected {
1704            issues.push(format!(
1705                "turn {turn_id} expected {expected} memory-plane replay effects, found {replayed}"
1706            ));
1707        }
1708    }
1709    if issues.is_empty() {
1710        None
1711    } else {
1712        Some(issues.join("; "))
1713    }
1714}
1715
1716/// Verify memory-plane injections in a step slice replay artifact/steer anchors.
1717#[must_use]
1718pub fn verify_step_memory_plane_replay_anchor(
1719    turn_events: &[KernelEvent],
1720    step_idx: u32,
1721) -> Option<String> {
1722    let step_events = events_for_step(turn_events, step_idx);
1723    let expected = step_events
1724        .iter()
1725        .filter(|event| is_memory_plane_injection_kernel_event(event))
1726        .count();
1727    if expected == 0 {
1728        return None;
1729    }
1730    let replay_effects = replay_step_effects(turn_events, step_idx)
1731        .iter()
1732        .filter(|effect| is_memory_plane_replay_effect(effect))
1733        .count();
1734    if replay_effects >= expected {
1735        None
1736    } else {
1737        Some(format!(
1738            "step {step_idx} expected >= {expected} memory-plane replay effects, found {replay_effects}"
1739        ))
1740    }
1741}
1742
1743/// Kernel events that should replay a `RunCompaction` effect.
1744#[must_use]
1745pub fn is_compaction_run_kernel_event(event: &KernelEvent) -> bool {
1746    match event {
1747        KernelEvent::CompactionArtifactCreated { .. } => true,
1748        KernelEvent::CapacityCheckpoint { action, .. } => {
1749            matches!(action, CapacityAction::Trim | CapacityAction::Handoff)
1750        }
1751        _ => false,
1752    }
1753}
1754
1755/// `RunCompaction` replay effects for compaction artifact / capacity-trim events.
1756#[must_use]
1757pub fn compaction_run_effects_from_events(events: &[KernelEvent]) -> Vec<Effect> {
1758    events
1759        .iter()
1760        .filter(|event| is_compaction_run_kernel_event(event))
1761        .map(|_| Effect::RunCompaction)
1762        .collect()
1763}
1764
1765/// Count `RunCompaction` effects emitted by [`ReplayTurnMachine`] for compaction events.
1766fn count_compaction_replay_run_effects(events: &[KernelEvent]) -> usize {
1767    let mut machine = ReplayTurnMachine;
1768    let mut projection = TurnKernelProjection::default();
1769    let mut count = 0;
1770    for event in events {
1771        let is_compaction = is_compaction_run_kernel_event(event);
1772        let out = machine.step(&projection, event.clone());
1773        projection.apply(event);
1774        if is_compaction {
1775            count += out
1776                .effects
1777                .iter()
1778                .filter(|effect| matches!(effect, Effect::RunCompaction))
1779                .count();
1780        }
1781    }
1782    count
1783}
1784
1785/// Verify compaction artifact / trim-handoff events replay `RunCompaction` in the effect chain.
1786#[must_use]
1787pub fn verify_thread_compaction_replay_anchors(
1788    turn_events: &[(String, Vec<KernelEvent>)],
1789) -> Option<String> {
1790    let mut issues = Vec::new();
1791    for (turn_id, events) in turn_events {
1792        let expected = events
1793            .iter()
1794            .filter(|event| is_compaction_run_kernel_event(event))
1795            .count();
1796        if expected == 0 {
1797            continue;
1798        }
1799        let replayed = count_compaction_replay_run_effects(events);
1800        if replayed < expected {
1801            issues.push(format!(
1802                "turn {turn_id} expected {expected} RunCompaction replay effects, found {replayed}"
1803            ));
1804        }
1805    }
1806    if issues.is_empty() {
1807        None
1808    } else {
1809        Some(issues.join("; "))
1810    }
1811}
1812
1813/// Verify compaction events in a step slice replay `RunCompaction` in the step effect chain.
1814#[must_use]
1815pub fn verify_step_compaction_replay_anchor(
1816    turn_events: &[KernelEvent],
1817    step_idx: u32,
1818) -> Option<String> {
1819    let step_events = events_for_step(turn_events, step_idx);
1820    let expected = step_events
1821        .iter()
1822        .filter(|event| is_compaction_run_kernel_event(event))
1823        .count();
1824    if expected == 0 {
1825        return None;
1826    }
1827    let run_effects = replay_step_effects(turn_events, step_idx)
1828        .iter()
1829        .filter(|effect| matches!(effect, Effect::RunCompaction))
1830        .count();
1831    if run_effects >= expected {
1832        None
1833    } else {
1834        Some(format!(
1835            "step {step_idx} expected >= {expected} RunCompaction replay effects, found {run_effects}"
1836        ))
1837    }
1838}
1839
1840/// Verify approval-required steps across a thread replay `RequestApproval` in their effect chain.
1841#[must_use]
1842pub fn verify_thread_request_approval_anchors(
1843    turn_events: &[(String, Vec<KernelEvent>)],
1844) -> Option<String> {
1845    let mut issues = Vec::new();
1846    for (_, events) in turn_events {
1847        let mut step_indices = std::collections::BTreeSet::new();
1848        for event in events {
1849            if let KernelEvent::ModelRequestIssued { step_idx, .. } = event {
1850                step_indices.insert(*step_idx);
1851            }
1852        }
1853        for step_idx in step_indices {
1854            if let Some(summary) = verify_step_request_approval_anchor(events, step_idx) {
1855                issues.push(summary);
1856            }
1857        }
1858    }
1859    if issues.is_empty() {
1860        None
1861    } else {
1862        Some(issues.join("; "))
1863    }
1864}
1865
1866/// Verify edit-tool steps across a thread replay `NotifyLsp` in their step effect chain.
1867#[must_use]
1868pub fn verify_thread_notify_lsp_anchors(
1869    turn_events: &[(String, Vec<KernelEvent>)],
1870) -> Option<String> {
1871    let mut issues = Vec::new();
1872    for (_, events) in turn_events {
1873        let mut step_indices = std::collections::BTreeSet::new();
1874        for event in events {
1875            if let KernelEvent::ModelRequestIssued { step_idx, .. } = event {
1876                step_indices.insert(*step_idx);
1877            }
1878        }
1879        for step_idx in step_indices {
1880            if let Some(summary) = verify_step_notify_lsp_anchor(events, step_idx) {
1881                issues.push(summary);
1882            }
1883        }
1884    }
1885    if issues.is_empty() {
1886        None
1887    } else {
1888        Some(issues.join("; "))
1889    }
1890}
1891
1892/// Count message-plane events across all turns on a thread.
1893#[must_use]
1894pub fn replay_thread_message_stats(
1895    turn_events: &[(String, Vec<KernelEvent>)],
1896) -> ThreadMessageReplayStats {
1897    let mut stats = ThreadMessageReplayStats::default();
1898    for (_, events) in turn_events {
1899        if events.is_empty() {
1900            continue;
1901        }
1902        stats.turns_with_events += 1;
1903        for event in events {
1904            match event {
1905                KernelEvent::ModelRequestIssued { .. } => stats.model_request_count += 1,
1906                KernelEvent::ModelMessage { .. } => stats.model_message_count += 1,
1907                KernelEvent::ToolCallPlanned { .. } => stats.tool_call_planned_count += 1,
1908                KernelEvent::SteerInjected { .. } => stats.steer_injection_count += 1,
1909                KernelEvent::CompactionArtifactCreated { .. } => {
1910                    stats.compaction_artifact_count += 1
1911                }
1912                KernelEvent::ScratchpadSummaryInjected { .. } => {
1913                    stats.scratchpad_summary_count += 1
1914                }
1915                KernelEvent::ScratchpadReminderInjected { .. } => {
1916                    stats.scratchpad_reminder_count += 1
1917                }
1918                KernelEvent::CycleBriefingInjected { .. } => stats.cycle_briefing_count += 1,
1919                KernelEvent::StepLimitContinuation { .. } => {
1920                    stats.step_limit_continuation_count += 1
1921                }
1922                KernelEvent::LoopGuardContinuation { .. } => {
1923                    stats.loop_guard_continuation_count += 1
1924                }
1925                KernelEvent::LayeredContextSeamInjected { .. } => {
1926                    stats.layered_context_seam_count += 1
1927                }
1928                _ => {}
1929            }
1930        }
1931    }
1932    stats
1933}
1934
1935/// Rebuild model-message anchors across a thread from persisted kernel events.
1936#[must_use]
1937pub fn replay_thread_message_timeline(
1938    turn_events: &[(String, Vec<KernelEvent>)],
1939) -> Vec<ThreadMessageTimelineEntry> {
1940    let mut timeline = Vec::new();
1941    for (_, events) in turn_events {
1942        for event in events {
1943            if let KernelEvent::ModelMessage {
1944                turn_id,
1945                step_idx,
1946                block_count,
1947                ..
1948            } = event
1949            {
1950                timeline.push(ThreadMessageTimelineEntry {
1951                    turn_id: turn_id.clone(),
1952                    step_idx: *step_idx,
1953                    block_count: *block_count,
1954                });
1955            }
1956        }
1957    }
1958    timeline
1959}
1960
1961/// Best-effort coverage check: session JSON message count vs kernel log counters (observability).
1962#[must_use]
1963pub fn build_session_message_coverage(
1964    session_message_count: usize,
1965    stats: &ThreadMessageReplayStats,
1966) -> Option<SessionMessageCoverage> {
1967    if stats.model_message_count == 0 {
1968        return None;
1969    }
1970    let kernel_model_message_count = stats.model_message_count;
1971    let expected_min = kernel_model_message_count as usize;
1972    let coverage_ok = session_message_count >= expected_min;
1973    let summary = if coverage_ok {
1974        None
1975    } else {
1976        Some(format!(
1977            "session messages ({session_message_count}) below kernel model_message events ({expected_min})"
1978        ))
1979    };
1980    Some(SessionMessageCoverage {
1981        session_message_count,
1982        kernel_model_message_count,
1983        coverage_ok,
1984        summary,
1985    })
1986}
1987
1988/// Best-effort coverage check: session JSON message count vs kernel log counters (observability).
1989#[must_use]
1990pub fn verify_session_message_coverage(
1991    session_message_count: usize,
1992    stats: &ThreadMessageReplayStats,
1993) -> Option<String> {
1994    build_session_message_coverage(session_message_count, stats).and_then(|c| c.summary)
1995}
1996
1997/// Verify internal consistency between timeline anchors and aggregated message stats.
1998#[must_use]
1999pub fn verify_message_timeline_coherence(
2000    stats: &ThreadMessageReplayStats,
2001    timeline: &[ThreadMessageTimelineEntry],
2002) -> Option<String> {
2003    let timeline_len = timeline.len();
2004    if timeline_len as u32 != stats.model_message_count {
2005        return Some(format!(
2006            "timeline entries ({timeline_len}) != model_message_count ({})",
2007            stats.model_message_count
2008        ));
2009    }
2010    for entry in timeline {
2011        if entry.block_count == 0 {
2012            return Some(format!(
2013                "turn {} step {} has block_count 0",
2014                entry.turn_id, entry.step_idx
2015            ));
2016        }
2017    }
2018    None
2019}
2020
2021/// Verify session message depth can host every timeline anchor (no body rebuild).
2022#[must_use]
2023pub fn verify_message_timeline_vs_session(
2024    session_message_count: usize,
2025    timeline: &[ThreadMessageTimelineEntry],
2026) -> Option<String> {
2027    if timeline.is_empty() {
2028        return None;
2029    }
2030    let min_messages = timeline.len();
2031    if session_message_count < min_messages {
2032        Some(format!(
2033            "session messages ({session_message_count}) below timeline anchors ({min_messages})"
2034        ))
2035    } else {
2036        None
2037    }
2038}
2039
2040/// Structured session + timeline + kernel log message-plane checks (observability).
2041#[derive(Debug, Clone, PartialEq, Eq)]
2042pub struct SessionMessageTimelineCoverage {
2043    pub session_message_count: usize,
2044    pub kernel_model_message_count: u32,
2045    pub timeline_anchor_count: usize,
2046    pub model_request_count: u32,
2047    pub estimated_min_session_messages: u32,
2048    pub coherence_ok: bool,
2049    pub coverage_ok: bool,
2050    pub timeline_vs_session_ok: bool,
2051    pub timeline_vs_requests_ok: bool,
2052    pub plane_depth_ok: bool,
2053    pub role_index_ok: bool,
2054    pub memory_plane_user_ok: bool,
2055    pub session_assistant_count: Option<u32>,
2056    pub session_tool_result_count: Option<u32>,
2057    pub session_text_user_count: Option<u32>,
2058    pub kernel_min_assistant_messages: u32,
2059    pub kernel_min_tool_result_messages: u32,
2060    pub kernel_min_memory_injected_user_messages: u32,
2061    pub compaction_depth_ok: bool,
2062    pub compaction_messages_removed_estimate: u32,
2063    pub compaction_restored_session_estimate: u32,
2064    pub compaction_peak_session_depth_hint: u32,
2065    pub compaction_artifact_ok: bool,
2066    pub session_compaction_artifact_count: Option<u32>,
2067    pub continuation_anchor_ok: bool,
2068    pub request_approval_anchor_ok: bool,
2069    pub notify_lsp_anchor_ok: bool,
2070    pub memory_plane_replay_anchor_ok: bool,
2071    pub compaction_replay_anchor_ok: bool,
2072    pub overall_ok: bool,
2073    pub kernel_transcript_preview_row_count: u32,
2074    pub transcript_preview_ok: bool,
2075    pub transcript_preview_body_ok: bool,
2076    pub summary: Option<String>,
2077}
2078
2079/// Verify timeline anchors do not exceed model request count on the thread.
2080#[must_use]
2081pub fn verify_timeline_vs_request_count(
2082    stats: &ThreadMessageReplayStats,
2083    timeline: &[ThreadMessageTimelineEntry],
2084) -> Option<String> {
2085    let anchors = timeline.len() as u32;
2086    if anchors > stats.model_request_count {
2087        Some(format!(
2088            "timeline anchors ({anchors}) exceed model_request_count ({})",
2089            stats.model_request_count
2090        ))
2091    } else {
2092        None
2093    }
2094}
2095
2096/// Build unified session / timeline / kernel log message-plane coverage report.
2097#[must_use]
2098pub fn build_session_message_timeline_coverage(
2099    session_message_count: usize,
2100    projection: &ThreadReplayProjection,
2101    role_index: Option<&SessionMessageRoleIndex>,
2102    session_compaction: Option<&[SessionCompactionArtifactEntry]>,
2103    session_messages: Option<&[Message]>,
2104    turn_events: Option<&[(String, Vec<KernelEvent>)]>,
2105) -> Option<SessionMessageTimelineCoverage> {
2106    let stats = &projection.message_stats;
2107    let timeline = &projection.message_timeline;
2108    let has_message_plane = stats.model_message_count > 0 || !timeline.is_empty();
2109    let has_memory_plane = stats.steer_injection_count > 0
2110        || stats.scratchpad_summary_count > 0
2111        || stats.scratchpad_reminder_count > 0
2112        || stats.step_limit_continuation_count > 0
2113        || stats.loop_guard_continuation_count > 0;
2114    let has_compaction = projection.compaction_index.artifact_count > 0;
2115    if !has_message_plane && !has_memory_plane && !has_compaction {
2116        return None;
2117    }
2118    let plane_index = &projection.message_plane_index;
2119    let compaction_index = &projection.compaction_index;
2120    let role_estimate = replay_kernel_message_role_estimate(stats);
2121    let memory_estimate = replay_kernel_memory_plane_user_estimate(stats);
2122    let coherence_ok = verify_message_timeline_coherence(stats, timeline).is_none();
2123    let coverage = build_session_message_coverage(session_message_count, stats);
2124    let coverage_ok = coverage.as_ref().map(|c| c.coverage_ok).unwrap_or(true);
2125    let timeline_vs_session_ok =
2126        verify_message_timeline_vs_session(session_message_count, timeline).is_none();
2127    let timeline_vs_requests_ok = verify_timeline_vs_request_count(stats, timeline).is_none();
2128    let plane_depth_ok =
2129        verify_session_message_plane_depth(session_message_count, plane_index).is_none();
2130    let role_index_ok = role_index
2131        .map(|idx| verify_session_role_index(idx, &role_estimate).is_none())
2132        .unwrap_or(true);
2133    let memory_plane_user_ok = role_index
2134        .map(|idx| verify_session_memory_plane_user_depth(idx, &memory_estimate).is_none())
2135        .unwrap_or(true);
2136    let compaction_depth_ok =
2137        verify_session_compaction_depth(session_message_count, compaction_index, plane_index)
2138            .is_none();
2139    let compaction_restored_session_estimate =
2140        session_message_count as u32 + compaction_index.messages_removed_estimate;
2141    let compaction_artifact_ok = session_compaction
2142        .map(|session| {
2143            verify_compaction_artifacts_vs_kernel_timeline(&projection.compaction_timeline, session)
2144                .is_none()
2145        })
2146        .unwrap_or(true);
2147    let continuation_anchor_ok = projection.continuation_anchor_ok;
2148    let request_approval_anchor_ok = projection.request_approval_anchor_ok;
2149    let notify_lsp_anchor_ok = projection.notify_lsp_anchor_ok;
2150    let memory_plane_replay_anchor_ok = projection.memory_plane_replay_anchor_ok;
2151    let compaction_replay_anchor_ok = projection.compaction_replay_anchor_ok;
2152    let transcript_preview_index = &projection.transcript_preview_index;
2153    let transcript_preview_ok = crate::engine::turn_loop::message_body_rebuild_policy::verify_session_transcript_preview_count(
2154        session_message_count,
2155        transcript_preview_index,
2156    )
2157    .is_none();
2158    let transcript_preview_body_ok = match (session_messages, turn_events) {
2159        (Some(messages), Some(events)) => {
2160            crate::engine::turn_loop::message_body_rebuild_policy::verify_session_transcript_preview_bodies(
2161                messages, events,
2162            )
2163            .is_none()
2164        }
2165        _ => true,
2166    };
2167    let overall_ok = coherence_ok
2168        && coverage_ok
2169        && timeline_vs_session_ok
2170        && timeline_vs_requests_ok
2171        && plane_depth_ok
2172        && role_index_ok
2173        && memory_plane_user_ok
2174        && compaction_depth_ok
2175        && compaction_artifact_ok
2176        && continuation_anchor_ok
2177        && request_approval_anchor_ok
2178        && notify_lsp_anchor_ok
2179        && memory_plane_replay_anchor_ok
2180        && compaction_replay_anchor_ok
2181        && transcript_preview_ok
2182        && transcript_preview_body_ok;
2183
2184    let mut summaries = Vec::new();
2185    if let Some(s) = verify_message_timeline_coherence(stats, timeline) {
2186        summaries.push(s);
2187    }
2188    if let Some(s) = coverage.and_then(|c| c.summary) {
2189        summaries.push(s);
2190    }
2191    if let Some(s) = verify_message_timeline_vs_session(session_message_count, timeline) {
2192        summaries.push(s);
2193    }
2194    if let Some(s) = verify_timeline_vs_request_count(stats, timeline) {
2195        summaries.push(s);
2196    }
2197    if let Some(s) = verify_session_message_plane_depth(session_message_count, plane_index) {
2198        summaries.push(s);
2199    }
2200    if let Some(s) =
2201        verify_session_compaction_depth(session_message_count, compaction_index, plane_index)
2202    {
2203        summaries.push(s);
2204    }
2205    if let Some(idx) = role_index {
2206        if let Some(s) = verify_session_role_index(idx, &role_estimate) {
2207            summaries.push(s);
2208        }
2209        if let Some(s) = verify_session_memory_plane_user_depth(idx, &memory_estimate) {
2210            summaries.push(s);
2211        }
2212    }
2213
2214    if let Some(session) = session_compaction
2215        && let Some(s) =
2216            verify_compaction_artifacts_vs_kernel_timeline(&projection.compaction_timeline, session)
2217    {
2218        summaries.push(s);
2219    }
2220    if !continuation_anchor_ok && let Some(s) = projection.continuation_anchor_summary.clone() {
2221        summaries.push(s);
2222    }
2223    if !request_approval_anchor_ok
2224        && let Some(s) = projection.request_approval_anchor_summary.clone()
2225    {
2226        summaries.push(s);
2227    }
2228    if !notify_lsp_anchor_ok && let Some(s) = projection.notify_lsp_anchor_summary.clone() {
2229        summaries.push(s);
2230    }
2231    if !memory_plane_replay_anchor_ok
2232        && let Some(s) = projection.memory_plane_replay_anchor_summary.clone()
2233    {
2234        summaries.push(s);
2235    }
2236    if !compaction_replay_anchor_ok
2237        && let Some(s) = projection.compaction_replay_anchor_summary.clone()
2238    {
2239        summaries.push(s);
2240    }
2241    if let Some(s) = crate::engine::turn_loop::message_body_rebuild_policy::verify_session_transcript_preview_count(
2242        session_message_count,
2243        transcript_preview_index,
2244    ) {
2245        summaries.push(s);
2246    }
2247    if let (Some(messages), Some(events)) = (session_messages, turn_events)
2248        && let Some(s) =
2249            crate::engine::turn_loop::message_body_rebuild_policy::verify_session_transcript_preview_bodies(
2250                messages, events,
2251            )
2252        {
2253            summaries.push(s);
2254        }
2255
2256    Some(SessionMessageTimelineCoverage {
2257        session_message_count,
2258        kernel_model_message_count: stats.model_message_count,
2259        timeline_anchor_count: timeline.len(),
2260        model_request_count: stats.model_request_count,
2261        estimated_min_session_messages: plane_index.estimated_min_session_messages,
2262        coherence_ok,
2263        coverage_ok,
2264        timeline_vs_session_ok,
2265        timeline_vs_requests_ok,
2266        plane_depth_ok,
2267        role_index_ok,
2268        memory_plane_user_ok,
2269        session_assistant_count: role_index.map(|idx| idx.assistant_message_count),
2270        session_tool_result_count: role_index.map(|idx| idx.tool_result_message_count),
2271        session_text_user_count: role_index.map(|idx| idx.text_user_message_count),
2272        kernel_min_assistant_messages: role_estimate.min_assistant_messages,
2273        kernel_min_tool_result_messages: role_estimate.min_tool_result_messages,
2274        kernel_min_memory_injected_user_messages: memory_estimate.min_memory_injected_user_messages,
2275        compaction_depth_ok,
2276        compaction_messages_removed_estimate: compaction_index.messages_removed_estimate,
2277        compaction_restored_session_estimate,
2278        compaction_peak_session_depth_hint: compaction_index.peak_session_depth_hint,
2279        compaction_artifact_ok,
2280        session_compaction_artifact_count: session_compaction.map(|s| s.len() as u32),
2281        continuation_anchor_ok,
2282        request_approval_anchor_ok,
2283        notify_lsp_anchor_ok,
2284        memory_plane_replay_anchor_ok,
2285        compaction_replay_anchor_ok,
2286        overall_ok,
2287        kernel_transcript_preview_row_count: transcript_preview_index.preview_row_count,
2288        transcript_preview_ok,
2289        transcript_preview_body_ok,
2290        summary: if summaries.is_empty() {
2291            None
2292        } else {
2293            Some(summaries.join("; "))
2294        },
2295    })
2296}
2297
2298/// Build thread replay report and the latest non-empty turn projection (resume substrate).
2299#[must_use]
2300pub fn replay_thread_projection(
2301    thread_id: &str,
2302    turn_events: &[(String, Vec<KernelEvent>)],
2303) -> ThreadReplayProjection {
2304    let report = build_thread_replay_report(thread_id, turn_events);
2305    let message_stats = replay_thread_message_stats(turn_events);
2306    let message_timeline = replay_thread_message_timeline(turn_events);
2307    let message_plane_index = replay_thread_message_plane_index(&message_stats);
2308    let transcript_preview_index =
2309        crate::engine::turn_loop::message_body_rebuild_policy::replay_thread_transcript_preview_index(
2310            turn_events,
2311        );
2312    let compaction_timeline = replay_thread_compaction_timeline(turn_events);
2313    let compaction_index = replay_thread_compaction_index(&compaction_timeline);
2314    let continuation_anchor_summary = verify_thread_continuation_anchors(turn_events);
2315    let continuation_anchor_ok = continuation_anchor_summary.is_none();
2316    let request_approval_anchor_summary = verify_thread_request_approval_anchors(turn_events);
2317    let request_approval_anchor_ok = request_approval_anchor_summary.is_none();
2318    let notify_lsp_anchor_summary = verify_thread_notify_lsp_anchors(turn_events);
2319    let notify_lsp_anchor_ok = notify_lsp_anchor_summary.is_none();
2320    let memory_plane_replay_anchor_summary = verify_thread_memory_plane_replay_anchors(turn_events);
2321    let memory_plane_replay_anchor_ok = memory_plane_replay_anchor_summary.is_none();
2322    let compaction_replay_anchor_summary = verify_thread_compaction_replay_anchors(turn_events);
2323    let compaction_replay_anchor_ok = compaction_replay_anchor_summary.is_none();
2324    let effect_counts = replay_thread_effect_counts(turn_events);
2325    let (latest_turn_id, latest_projection) = turn_events
2326        .iter()
2327        .rev()
2328        .find(|(_, events)| !events.is_empty())
2329        .map(|(turn_id, events)| {
2330            (
2331                Some(turn_id.clone()),
2332                TurnKernelProjection::from_events(events),
2333            )
2334        })
2335        .unwrap_or((None, TurnKernelProjection::default()));
2336    ThreadReplayProjection {
2337        report,
2338        latest_turn_id,
2339        latest_projection,
2340        message_stats,
2341        message_timeline,
2342        message_plane_index,
2343        transcript_preview_index,
2344        compaction_timeline,
2345        compaction_index,
2346        continuation_anchor_ok,
2347        continuation_anchor_summary,
2348        request_approval_anchor_ok,
2349        request_approval_anchor_summary,
2350        notify_lsp_anchor_ok,
2351        notify_lsp_anchor_summary,
2352        memory_plane_replay_anchor_ok,
2353        memory_plane_replay_anchor_summary,
2354        compaction_replay_anchor_ok,
2355        compaction_replay_anchor_summary,
2356        effect_counts,
2357    }
2358}
2359
2360/// Host-visible fields restored from a thread's latest kernel projection on engine load.
2361#[derive(Debug, Clone, PartialEq, Eq, Default)]
2362pub struct KernelResumeHints {
2363    pub latest_turn_id: Option<String>,
2364    pub step_idx: u32,
2365    pub max_steps: u32,
2366    pub scratchpad_summary_injected: bool,
2367    pub active_tool_count: u32,
2368    /// Aggregated `ModelMessage` count on the linked thread (log substrate; text stays in session store).
2369    pub kernel_model_message_count: u32,
2370    /// Aggregated `ModelRequestIssued` count on the linked thread.
2371    pub kernel_model_request_count: u32,
2372    /// Lower-bound session rows estimated from kernel log (assistant + tool results).
2373    pub kernel_estimated_min_session_messages: u32,
2374    /// Turn ids with persisted kernel events on the linked thread (resume replay substrate).
2375    pub thread_turn_ids_with_events: Vec<String>,
2376    /// Expected anchor-class replay effect count for the linked thread (`replay_thread_effect_counts`).
2377    pub expected_anchor_effect_count: u32,
2378    /// Rebuilt preview transcript row count (5c; when preview bodies exist).
2379    pub kernel_transcript_preview_row_count: u32,
2380    /// Events carrying non-empty message-body previews on the linked thread.
2381    pub kernel_transcript_preview_body_count: u32,
2382    /// Runtime thread id for session-store lookup on repair persist (host-supplied).
2383    pub runtime_thread_id: Option<String>,
2384    /// Outer-loop continuation counters from latest turn projection (5c cont.).
2385    pub step_limit_continuations: u32,
2386    pub loop_guard_continuations: u32,
2387    pub cycle_handoff_attempts: u32,
2388    pub in_turn_cycle_advances: u32,
2389}
2390
2391/// Extract resume hints from the latest turn projection (log-driven resume substrate).
2392#[must_use]
2393pub fn kernel_resume_hints_from_projection(proj: &TurnKernelProjection) -> KernelResumeHints {
2394    KernelResumeHints {
2395        latest_turn_id: if proj.turn_id.is_empty() {
2396            None
2397        } else {
2398            Some(proj.turn_id.clone())
2399        },
2400        step_idx: proj.step_idx,
2401        max_steps: proj.max_steps,
2402        scratchpad_summary_injected: proj.scratchpad_summary_injected,
2403        active_tool_count: proj.active_tool_names.len() as u32,
2404        kernel_model_message_count: 0,
2405        kernel_model_request_count: 0,
2406        kernel_estimated_min_session_messages: 0,
2407        thread_turn_ids_with_events: Vec::new(),
2408        expected_anchor_effect_count: 0,
2409        kernel_transcript_preview_row_count: 0,
2410        kernel_transcript_preview_body_count: 0,
2411        runtime_thread_id: None,
2412        step_limit_continuations: proj.step_limit_continuations,
2413        loop_guard_continuations: proj.loop_guard_continuations,
2414        cycle_handoff_attempts: proj.cycle_handoff_attempts,
2415        in_turn_cycle_advances: proj.in_turn_cycle_advances,
2416    }
2417}
2418
2419/// Resume hints from thread projection plus aggregated message counters.
2420#[must_use]
2421pub fn kernel_resume_hints_from_thread_projection(
2422    projection: &ThreadReplayProjection,
2423) -> KernelResumeHints {
2424    let mut hints = kernel_resume_hints_from_projection(&projection.latest_projection);
2425    hints.kernel_model_message_count = projection.message_stats.model_message_count;
2426    hints.kernel_model_request_count = projection.message_stats.model_request_count;
2427    hints.kernel_estimated_min_session_messages = projection
2428        .message_plane_index
2429        .estimated_min_session_messages;
2430    hints.thread_turn_ids_with_events = projection
2431        .report
2432        .turns
2433        .iter()
2434        .filter(|turn| turn.event_count > 0)
2435        .map(|turn| turn.turn_id.clone())
2436        .collect();
2437    hints.expected_anchor_effect_count = projection.effect_counts.anchor_effect_total();
2438    hints.kernel_transcript_preview_row_count =
2439        projection.transcript_preview_index.preview_row_count;
2440    hints.kernel_transcript_preview_body_count =
2441        projection.transcript_preview_index.preview_body_event_count;
2442    hints
2443}
2444
2445/// Replay IO effects for a slice with an initial projection (turn-log prefix aware).
2446#[must_use]
2447pub fn replay_events_with_projection(
2448    mut projection: TurnKernelProjection,
2449    events: &[KernelEvent],
2450) -> Vec<Effect> {
2451    let mut machine = ReplayTurnMachine;
2452    let mut planned: std::collections::HashMap<String, (String, String)> =
2453        std::collections::HashMap::new();
2454    let mut effects = Vec::new();
2455    for event in events {
2456        if let KernelEvent::ToolCallPlanned {
2457            call_id,
2458            tool_name,
2459            input_json,
2460            ..
2461        } = event
2462        {
2463            planned.insert(call_id.clone(), (tool_name.clone(), input_json.clone()));
2464        }
2465        if let KernelEvent::ToolCallFinished {
2466            call_id, outcome, ..
2467        } = event
2468        {
2469            crate::engine::turn_loop::memory_plane_working_policy::record_working_set_path_touch(
2470                &mut projection,
2471                &planned,
2472                call_id,
2473                outcome,
2474            );
2475            planned.remove(call_id);
2476        }
2477        let out = machine.step(&projection, event.clone());
2478        projection.apply(event);
2479        effects.extend(out.effects);
2480    }
2481    effects
2482}
2483
2484/// Replay all IO effects implied by a turn's event log via [`ReplayTurnMachine`].
2485#[must_use]
2486pub fn replay_turn_effects(events: &[KernelEvent]) -> Vec<Effect> {
2487    replay_events_with_projection(TurnKernelProjection::default(), events)
2488}
2489
2490/// Aggregated replay effect counts for v3 observability at turn end.
2491#[derive(
2492    Debug,
2493    Clone,
2494    Copy,
2495    Default,
2496    PartialEq,
2497    Eq,
2498    serde::Serialize,
2499    serde::Deserialize,
2500    schemars::JsonSchema,
2501)]
2502pub struct ReplayEffectCounts {
2503    pub call_model: u32,
2504    pub execute_batch: u32,
2505    pub request_approval: u32,
2506    pub inject_steer: u32,
2507    pub run_compaction: u32,
2508    pub notify_lsp: u32,
2509    pub sleep: u32,
2510    pub query_memory: u32,
2511    pub run_layered_context_checkpoint: u32,
2512    pub refresh_system_prompt: u32,
2513    pub emit_artifact: u32,
2514}
2515
2516impl ReplayEffectCounts {
2517    /// Anchor-class effects replayed on resume (`InjectSteer` + `RunCompaction` + `NotifyLsp`).
2518    #[must_use]
2519    pub fn anchor_effect_total(self) -> u32 {
2520        self.inject_steer + self.run_compaction + self.notify_lsp
2521    }
2522}
2523
2524/// Compare resume anchor-only interpret count vs thread replay effect totals.
2525#[must_use]
2526pub fn verify_resume_anchor_effect_alignment(expected: u32, interpreted: u64) -> Option<String> {
2527    if interpreted == u64::from(expected) {
2528        return None;
2529    }
2530    Some(format!(
2531        "resume anchor effect mismatch: interpreted={interpreted} expected={expected}"
2532    ))
2533}
2534
2535/// Count replay-chain effects by kind (shadow / v3 turn-end logging).
2536#[must_use]
2537pub fn replay_effect_counts(events: &[KernelEvent]) -> ReplayEffectCounts {
2538    let effects = replay_turn_effects(events);
2539    let mut counts = ReplayEffectCounts::default();
2540    for effect in effects {
2541        match effect {
2542            Effect::CallModel { .. } => counts.call_model += 1,
2543            Effect::ExecuteBatch { .. } => counts.execute_batch += 1,
2544            Effect::RequestApproval { .. } => counts.request_approval += 1,
2545            Effect::InjectSteer { .. } => counts.inject_steer += 1,
2546            Effect::RunCompaction => counts.run_compaction += 1,
2547            Effect::NotifyLsp { .. } => counts.notify_lsp += 1,
2548            Effect::Sleep { .. } => counts.sleep += 1,
2549            Effect::QueryMemory { .. } => counts.query_memory += 1,
2550            Effect::RunLayeredContextCheckpoint => counts.run_layered_context_checkpoint += 1,
2551            Effect::RefreshSystemPrompt => counts.refresh_system_prompt += 1,
2552            Effect::EmitArtifact { .. } => counts.emit_artifact += 1,
2553        }
2554    }
2555    counts
2556}
2557
2558/// Sum replay effect counts across all turns on a thread.
2559#[must_use]
2560pub fn replay_thread_effect_counts(
2561    turn_events: &[(String, Vec<KernelEvent>)],
2562) -> ReplayEffectCounts {
2563    let mut total = ReplayEffectCounts::default();
2564    for (_, events) in turn_events {
2565        let counts = replay_effect_counts(events);
2566        total.call_model += counts.call_model;
2567        total.execute_batch += counts.execute_batch;
2568        total.request_approval += counts.request_approval;
2569        total.inject_steer += counts.inject_steer;
2570        total.run_compaction += counts.run_compaction;
2571        total.notify_lsp += counts.notify_lsp;
2572        total.sleep += counts.sleep;
2573        total.query_memory += counts.query_memory;
2574        total.run_layered_context_checkpoint += counts.run_layered_context_checkpoint;
2575        total.refresh_system_prompt += counts.refresh_system_prompt;
2576        total.emit_artifact += counts.emit_artifact;
2577    }
2578    total
2579}
2580
2581/// Slice a turn log down to one step's events (from `ModelRequestIssued` through tool work).
2582#[must_use]
2583pub fn events_for_step(events: &[KernelEvent], step_idx: u32) -> Vec<KernelEvent> {
2584    let mut in_step = false;
2585    let mut out = Vec::new();
2586    for event in events {
2587        match event {
2588            KernelEvent::ModelRequestIssued { step_idx: s, .. } if *s == step_idx => {
2589                in_step = true;
2590                out.push(event.clone());
2591            }
2592            KernelEvent::ModelRequestIssued { .. } if in_step => break,
2593            KernelEvent::TurnEnded { .. } if in_step => break,
2594            _ if in_step => out.push(event.clone()),
2595            _ => {}
2596        }
2597    }
2598    out
2599}
2600
2601/// Projection rebuilt from all turn events strictly before this step's `ModelRequestIssued`.
2602#[must_use]
2603pub fn projection_before_step_model_request(
2604    turn_events: &[KernelEvent],
2605    step_idx: u32,
2606) -> TurnKernelProjection {
2607    let mut prior = Vec::new();
2608    for event in turn_events {
2609        if matches!(
2610            event,
2611            KernelEvent::ModelRequestIssued { step_idx: s, .. } if *s == step_idx
2612        ) {
2613            break;
2614        }
2615        prior.push(event.clone());
2616    }
2617    TurnKernelProjection::from_events(&prior)
2618}
2619
2620/// Replay step effects with turn-log prefix projection (multi-step safe).
2621#[must_use]
2622pub fn replay_step_effects_from_turn_log(
2623    turn_events: &[KernelEvent],
2624    step_idx: u32,
2625) -> Vec<Effect> {
2626    let step_events = events_for_step(turn_events, step_idx);
2627    if step_events.is_empty() {
2628        return Vec::new();
2629    }
2630    let projection = projection_before_step_model_request(turn_events, step_idx);
2631    replay_events_with_projection(projection, &step_events)
2632}
2633
2634/// Replay IO effects for a single step slice.
2635#[must_use]
2636pub fn replay_step_effects(events: &[KernelEvent], step_idx: u32) -> Vec<Effect> {
2637    replay_step_effects_from_turn_log(events, step_idx)
2638}
2639
2640/// Planned v3 step effects before tool outcomes are known (`ExecuteBatch` when tools planned).
2641///
2642/// After `ExecuteBatch` completes, append [`notify_lsp_effects_from_step_events`] so the
2643/// live v3 chain matches replay (`ToolCallFinished` → `NotifyLsp`).
2644#[must_use]
2645pub fn plan_v3_step_effects(token_budget: u32, tool_call_ids: &[String]) -> Vec<Effect> {
2646    let mut effects = vec![Effect::CallModel { token_budget }];
2647    for call_id in tool_call_ids {
2648        effects.push(Effect::ExecuteBatch {
2649            call_ids: vec![call_id.clone()],
2650        });
2651    }
2652    effects
2653}
2654
2655/// Pre-`CallModel` memory queries for the v3 live path (mirrors `ReplayTurnMachine` on `ModelRequestIssued`).
2656#[must_use]
2657pub fn plan_v3_pre_call_model_effects(
2658    projection: &TurnKernelProjection,
2659    episodic_hints: Option<
2660        crate::engine::turn_loop::memory_plane_episodic_policy::MemoryPlaneEpisodicHints,
2661    >,
2662) -> Vec<Effect> {
2663    crate::engine::turn_loop::memory_plane_query_policy::query_memory_effects_before_model_call(
2664        projection,
2665        episodic_hints,
2666    )
2667}
2668
2669/// Pre-`ExecuteBatch` approval effects derived from `ToolCallPlanned` policy metadata.
2670#[must_use]
2671pub fn request_approval_effects_from_step_events(step_events: &[KernelEvent]) -> Vec<Effect> {
2672    step_events
2673        .iter()
2674        .filter_map(|event| {
2675            let KernelEvent::ToolCallPlanned {
2676                call_id,
2677                tool_name,
2678                decision,
2679                ..
2680            } = event
2681            else {
2682                return None;
2683            };
2684            if decision.approval_required {
2685                Some(Effect::RequestApproval {
2686                    call_id: call_id.clone(),
2687                    description: tool_name.clone(),
2688                })
2689            } else {
2690                None
2691            }
2692        })
2693        .collect()
2694}
2695
2696/// Verify capacity checkpoint replay effects (`Sleep` / `RunCompaction`) per step.
2697#[must_use]
2698pub fn verify_capacity_effect_replay_coherence(turn_events: &[KernelEvent]) -> Option<String> {
2699    let mut step_indices = std::collections::BTreeSet::new();
2700    for event in turn_events {
2701        match event {
2702            KernelEvent::ModelRequestIssued { step_idx, .. }
2703            | KernelEvent::CapacityCheckpoint { step_idx, .. } => {
2704                step_indices.insert(*step_idx);
2705            }
2706            _ => {}
2707        }
2708    }
2709    let mut issues = Vec::new();
2710    for step_idx in step_indices {
2711        if let Some(summary) = verify_step_capacity_sleep_anchor(turn_events, step_idx) {
2712            issues.push(summary);
2713        }
2714        if let Some(summary) = verify_step_compaction_replay_anchor(turn_events, step_idx) {
2715            issues.push(summary);
2716        }
2717    }
2718    if issues.is_empty() {
2719        None
2720    } else {
2721        Some(issues.join("; "))
2722    }
2723}
2724
2725/// Symbolic back-off duration for capacity cooldown replay anchors (deterministic).
2726#[must_use]
2727pub const fn capacity_cooldown_backoff_millis() -> u64 {
2728    100
2729}
2730
2731/// Whether a checkpoint records a cooldown-suppressed guardrail (`Continue` + blocked).
2732#[must_use]
2733pub fn is_capacity_cooldown_blocked_checkpoint(event: &KernelEvent) -> bool {
2734    matches!(
2735        event,
2736        KernelEvent::CapacityCheckpoint {
2737            action: CapacityAction::Continue,
2738            cooldown_blocked: true,
2739            ..
2740        }
2741    )
2742}
2743
2744/// Verify steps replay a `Sleep` effect per cooldown-blocked capacity checkpoint.
2745#[must_use]
2746pub fn verify_step_capacity_sleep_anchor(
2747    turn_events: &[KernelEvent],
2748    step_idx: u32,
2749) -> Option<String> {
2750    let step_events = events_for_step(turn_events, step_idx);
2751    let expected = step_events
2752        .iter()
2753        .filter(|event| is_capacity_cooldown_blocked_checkpoint(event))
2754        .count();
2755    if expected == 0 {
2756        return None;
2757    }
2758    let sleep_effects = replay_step_effects(turn_events, step_idx)
2759        .iter()
2760        .filter(|effect| matches!(effect, Effect::Sleep { .. }))
2761        .count();
2762    if sleep_effects >= expected {
2763        None
2764    } else {
2765        Some(format!(
2766            "step {step_idx} expected {expected} Sleep replay effects, found {sleep_effects}"
2767        ))
2768    }
2769}
2770
2771/// Post-`ExecuteBatch` notify-LSP tail derived from observed `ToolCallFinished` events.
2772#[must_use]
2773pub fn notify_lsp_effects_from_step_events(step_events: &[KernelEvent]) -> Vec<Effect> {
2774    step_events
2775        .iter()
2776        .filter_map(|event| {
2777            let KernelEvent::ToolCallFinished {
2778                tool_name,
2779                wrote_state: true,
2780                ..
2781            } = event
2782            else {
2783                return None;
2784            };
2785            if is_lsp_notify_tool(tool_name) {
2786                Some(Effect::NotifyLsp {
2787                    tool_name: tool_name.clone(),
2788                })
2789            } else {
2790                None
2791            }
2792        })
2793        .collect()
2794}
2795
2796/// Continuation `InjectSteer` effects at a step boundary (matches `ReplayTurnMachine`; empty text).
2797#[must_use]
2798pub fn continuation_inject_steer_effects_for_step(
2799    turn_events: &[KernelEvent],
2800    step_idx: u32,
2801) -> Vec<Effect> {
2802    turn_events
2803        .iter()
2804        .filter_map(|event| match event {
2805            KernelEvent::StepLimitContinuation { step_idx: s, .. }
2806            | KernelEvent::LoopGuardContinuation { step_idx: s, .. }
2807                if *s == step_idx =>
2808            {
2809                Some(Effect::InjectSteer {
2810                    text: String::new(),
2811                })
2812            }
2813            _ => None,
2814        })
2815        .collect()
2816}
2817
2818/// Verify replay effect counts for one step match the executed tool batch size.
2819#[must_use]
2820pub fn verify_step_effect_parity(
2821    turn_events: &[KernelEvent],
2822    step_idx: u32,
2823    executed_tool_count: u32,
2824) -> Option<String> {
2825    let step_events = events_for_step(turn_events, step_idx);
2826    if step_events.is_empty() {
2827        if executed_tool_count > 0 {
2828            return Some(format!(
2829                "step {step_idx} has no ModelRequestIssued anchor but {executed_tool_count} ToolCallPlanned event(s)"
2830            ));
2831        }
2832        return None;
2833    }
2834    let counts = replay_effect_counts(&step_events);
2835    let mut diffs = Vec::new();
2836    if counts.call_model != 1 {
2837        diffs.push(format!(
2838            "step {step_idx} CallModel replay count {} != 1",
2839            counts.call_model
2840        ));
2841    }
2842    if counts.execute_batch != executed_tool_count {
2843        diffs.push(format!(
2844            "step {step_idx} ExecuteBatch replay count {} != executed {executed_tool_count}",
2845            counts.execute_batch
2846        ));
2847    }
2848    if diffs.is_empty() {
2849        None
2850    } else {
2851        Some(diffs.join("; "))
2852    }
2853}
2854
2855impl Effect {
2856    #[must_use]
2857    pub fn kind_str(&self) -> &'static str {
2858        match self {
2859            Effect::CallModel { .. } => "call_model",
2860            Effect::ExecuteBatch { .. } => "execute_batch",
2861            Effect::RequestApproval { .. } => "request_approval",
2862            Effect::InjectSteer { .. } => "inject_steer",
2863            Effect::RunCompaction => "run_compaction",
2864            Effect::NotifyLsp { .. } => "notify_lsp",
2865            Effect::Sleep { .. } => "sleep",
2866            Effect::QueryMemory { .. } => "query_memory",
2867            Effect::RunLayeredContextCheckpoint => "run_layered_context_checkpoint",
2868            Effect::RefreshSystemPrompt => "refresh_system_prompt",
2869            Effect::EmitArtifact { .. } => "emit_artifact",
2870        }
2871    }
2872}
2873
2874// ── Helpers ───────────────────────────────────────────────────────────────────
2875
2876/// Map `TurnOutcomeStatus` (legacy) to the richer `TurnOutcome` (v3 schema).
2877#[must_use]
2878pub fn outcome_from_status(status: TurnOutcomeStatus, error: Option<String>) -> TurnOutcome {
2879    match status {
2880        TurnOutcomeStatus::Completed => TurnOutcome::Completed,
2881        TurnOutcomeStatus::Interrupted => TurnOutcome::Interrupted,
2882        TurnOutcomeStatus::Failed => TurnOutcome::Failed {
2883            message: error.unwrap_or_default(),
2884        },
2885    }
2886}
2887
2888// ── Tests ─────────────────────────────────────────────────────────────────────
2889
2890#[cfg(test)]
2891mod tests {
2892    use std::path::PathBuf;
2893
2894    use super::*;
2895    use crate::engine::kernel_event::{KernelEvent, ToolOutcome, TurnOutcome};
2896    use crate::engine::request_fingerprint::RequestFingerprint;
2897    use crate::turn::TurnLoopMode;
2898
2899    fn make_fp() -> RequestFingerprint {
2900        RequestFingerprint {
2901            static_prefix_sha256: "aaa".into(),
2902            full_prefix_sha256: "bbb".into(),
2903        }
2904    }
2905
2906    #[test]
2907    fn projection_rebuilds_active_tool_names() {
2908        let events = vec![
2909            KernelEvent::TurnStarted {
2910                turn_id: "t1".into(),
2911                mode: TurnLoopMode::Agent,
2912                input_text: "test".into(),
2913                max_steps: 10,
2914            },
2915            KernelEvent::DeferredToolActivated {
2916                turn_id: "t1".into(),
2917                step_idx: 1,
2918                tool_name: "tool_search_bm25".into(),
2919            },
2920        ];
2921        let p = TurnKernelProjection::from_events(&events);
2922        assert!(p.active_tool_names.contains("tool_search_bm25"));
2923    }
2924
2925    #[test]
2926    fn projection_resets_step_counters_on_model_request() {
2927        let events = vec![
2928            KernelEvent::ModelRequestIssued {
2929                turn_id: "t1".into(),
2930                step_idx: 1,
2931                request_fp: make_fp(),
2932                token_budget: 32000,
2933            },
2934            KernelEvent::ToolCallFinished {
2935                turn_id: "t1".into(),
2936                call_id: "c1".into(),
2937                tool_name: "read_file".into(),
2938                outcome: ToolOutcome::Success,
2939                duration_ms: 10,
2940                wrote_state: false,
2941                result_preview: String::new(),
2942                session_content: String::new(),
2943            },
2944            // Second request resets counters.
2945            KernelEvent::ModelRequestIssued {
2946                turn_id: "t1".into(),
2947                step_idx: 2,
2948                request_fp: make_fp(),
2949                token_budget: 32000,
2950            },
2951        ];
2952        let p = TurnKernelProjection::from_events(&events);
2953        assert_eq!(p.readonly_tool_successes, 0);
2954        assert_eq!(p.step_idx, 2);
2955    }
2956
2957    #[test]
2958    fn projection_tracks_scratchpad_summary_injected() {
2959        let events = vec![KernelEvent::ScratchpadSummaryInjected {
2960            turn_id: "t1".into(),
2961            at_step: 3,
2962        }];
2963        let p = TurnKernelProjection::from_events(&events);
2964        assert!(p.scratchpad_summary_injected);
2965    }
2966
2967    #[test]
2968    fn projection_tracks_continuations() {
2969        let events = vec![
2970            KernelEvent::StepLimitContinuation {
2971                turn_id: "t1".into(),
2972                step_idx: 20,
2973                lht_objective_injected: true,
2974            },
2975            KernelEvent::LoopGuardContinuation {
2976                turn_id: "t1".into(),
2977                step_idx: 21,
2978            },
2979        ];
2980        let p = TurnKernelProjection::from_events(&events);
2981        assert_eq!(p.step_limit_continuations, 1);
2982        assert_eq!(p.loop_guard_continuations, 1);
2983    }
2984
2985    #[test]
2986    fn projection_tracks_memory_plane_events() {
2987        let events = vec![
2988            KernelEvent::ScratchpadReminderInjected {
2989                turn_id: "t1".into(),
2990                step_idx: 2,
2991                area_path: "src/main.rs".into(),
2992            },
2993            KernelEvent::CompactionArtifactCreated {
2994                turn_id: "t1".into(),
2995                artifact_id: "art-1".into(),
2996                replaced_range: crate::engine::kernel_event::MessageRange { from: 1, to: 5 },
2997                summary_token_count: 120,
2998            },
2999            KernelEvent::CycleBriefingInjected {
3000                turn_id: "t1".into(),
3001                cycle: 2,
3002                step_idx: 3,
3003            },
3004        ];
3005        let p = TurnKernelProjection::from_events(&events);
3006        assert_eq!(p.scratchpad_reminder_count, 1);
3007        assert_eq!(p.compaction_artifact_count, 1);
3008        assert_eq!(p.cycle_briefing_count, 1);
3009        assert!(verify_memory_projection_chain(&events).is_none());
3010    }
3011
3012    #[test]
3013    fn replay_coherence_passes_golden_fixtures() {
3014        let events = vec![
3015            KernelEvent::TurnStarted {
3016                turn_id: "t1".into(),
3017                mode: TurnLoopMode::Agent,
3018                input_text: "hi".into(),
3019                max_steps: 5,
3020            },
3021            KernelEvent::TurnEnded {
3022                turn_id: "t1".into(),
3023                outcome: TurnOutcome::Completed,
3024                total_steps: 1,
3025            },
3026        ];
3027        let report = replay_turn_projection(&events);
3028        assert_eq!(report.event_count, 2);
3029        assert_eq!(report.outcome, Some(TurnOutcome::Completed));
3030        assert!(verify_turn_replay_coherence(&events, None).is_none());
3031    }
3032
3033    #[test]
3034    fn outcome_mapping_covers_all_statuses() {
3035        assert_eq!(
3036            outcome_from_status(TurnOutcomeStatus::Completed, None),
3037            TurnOutcome::Completed
3038        );
3039        assert_eq!(
3040            outcome_from_status(TurnOutcomeStatus::Interrupted, None),
3041            TurnOutcome::Interrupted
3042        );
3043        assert!(matches!(
3044            outcome_from_status(TurnOutcomeStatus::Failed, Some("boom".into())),
3045            TurnOutcome::Failed { message } if message == "boom"
3046        ));
3047    }
3048
3049    #[test]
3050    fn emit_kernel_no_op_on_none() {
3051        // Should not panic.
3052        emit_kernel(None, KernelEvent::SchemaVersion { version: 1 });
3053    }
3054
3055    #[test]
3056    fn emit_kernel_sends_when_some() {
3057        let (tx, mut rx) = mpsc::unbounded_channel::<KernelEvent>();
3058        emit_kernel(Some(&tx), KernelEvent::SchemaVersion { version: 1 });
3059        let ev = rx.try_recv().expect("event received");
3060        assert_eq!(ev.kind_str(), "schema_version");
3061    }
3062
3063    #[test]
3064    fn compare_projection_detects_in_turn_cycle_mismatch() {
3065        let live = LiveTurnSnapshot {
3066            turn_id: "t1".into(),
3067            in_turn_cycle_advances: 1,
3068            ..Default::default()
3069        };
3070        let proj = TurnKernelProjection {
3071            turn_id: "t1".into(),
3072            in_turn_cycle_advances: 0,
3073            ..Default::default()
3074        };
3075        let diff = compare_projection_to_live(&live, &proj);
3076        assert!(diff.is_some());
3077        assert!(diff.unwrap().contains("in_turn_cycle_advances"));
3078    }
3079
3080    #[test]
3081    fn compare_projection_detects_step_mismatch() {
3082        let live = LiveTurnSnapshot {
3083            turn_id: "t1".into(),
3084            step_idx: 5,
3085            max_steps: 20,
3086            ..Default::default()
3087        };
3088        let proj = TurnKernelProjection {
3089            turn_id: "t1".into(),
3090            step_idx: 3,
3091            max_steps: 20,
3092            ..Default::default()
3093        };
3094        let diff = compare_projection_to_live(&live, &proj);
3095        assert!(diff.is_some());
3096        assert!(diff.unwrap().contains("step_idx"));
3097    }
3098
3099    #[test]
3100    fn replay_turn_machine_emits_call_model_effect() {
3101        let mut machine = ReplayTurnMachine;
3102        let proj = TurnKernelProjection::default();
3103        let out = machine.step(
3104            &proj,
3105            KernelEvent::ModelRequestIssued {
3106                turn_id: "t1".into(),
3107                step_idx: 1,
3108                request_fp: make_fp(),
3109                token_budget: 4096,
3110            },
3111        );
3112        assert_eq!(out.effects.len(), 1);
3113        assert!(matches!(
3114            out.effects[0],
3115            Effect::CallModel { token_budget: 4096 }
3116        ));
3117    }
3118
3119    #[test]
3120    fn verify_effect_replay_chain_passes_minimal_turn() {
3121        let events = vec![
3122            KernelEvent::TurnStarted {
3123                turn_id: "t1".into(),
3124                mode: TurnLoopMode::Agent,
3125                input_text: "x".into(),
3126                max_steps: 5,
3127            },
3128            KernelEvent::ModelRequestIssued {
3129                turn_id: "t1".into(),
3130                step_idx: 1,
3131                request_fp: make_fp(),
3132                token_budget: 1024,
3133            },
3134            KernelEvent::TurnEnded {
3135                turn_id: "t1".into(),
3136                outcome: TurnOutcome::Completed,
3137                total_steps: 1,
3138            },
3139        ];
3140        assert!(verify_effect_replay_chain(&events).is_none());
3141    }
3142
3143    #[test]
3144    fn verify_effect_replay_chain_detects_missing_turn_ended() {
3145        let events = vec![KernelEvent::TurnStarted {
3146            turn_id: "t1".into(),
3147            mode: TurnLoopMode::Agent,
3148            input_text: "x".into(),
3149            max_steps: 5,
3150        }];
3151        let msg = verify_effect_replay_chain(&events).expect("diff");
3152        assert!(msg.contains("TurnEnded"));
3153    }
3154
3155    #[test]
3156    fn replay_turn_effects_matches_pure_read_fixture() {
3157        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3158            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3159        let raw = std::fs::read_to_string(&path).expect("read fixture");
3160        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3161        let counts = replay_effect_counts(&events);
3162        assert_eq!(counts.call_model, 1);
3163        assert_eq!(counts.execute_batch, 1);
3164        assert!(verify_effect_replay_chain(&events).is_none());
3165    }
3166
3167    #[test]
3168    fn replay_effect_counts_on_scratchpad_compaction_fixture() {
3169        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3170            .join("../../fixtures/harness/kernel-v3-replay/scratchpad_compaction.json");
3171        let raw = std::fs::read_to_string(&path).expect("read fixture");
3172        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3173        let counts = replay_effect_counts(&events);
3174        assert_eq!(counts.inject_steer, 1);
3175        assert_eq!(counts.emit_artifact, 2);
3176        assert_eq!(counts.run_compaction, 1);
3177    }
3178
3179    #[test]
3180    fn replay_thread_effect_counts_on_scratchpad_compaction_fixture() {
3181        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3182            .join("../../fixtures/harness/kernel-v3-replay/scratchpad_compaction.json");
3183        let raw = std::fs::read_to_string(&path).expect("read fixture");
3184        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3185        let turn_events = [("t1".into(), events)];
3186        let counts = replay_thread_effect_counts(&turn_events);
3187        assert_eq!(counts.inject_steer, 1);
3188        assert_eq!(counts.emit_artifact, 2);
3189        assert_eq!(counts.run_compaction, 1);
3190        let projection = replay_thread_projection("t1", &turn_events);
3191        assert_eq!(projection.effect_counts, counts);
3192    }
3193
3194    #[test]
3195    fn replay_effect_counts_on_write_batch_fixture() {
3196        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3197            .join("../../fixtures/harness/kernel-v3-replay/write_batch.json");
3198        let raw = std::fs::read_to_string(&path).expect("read fixture");
3199        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3200        let counts = replay_effect_counts(&events);
3201        assert_eq!(counts.call_model, 1);
3202        assert_eq!(counts.execute_batch, 2);
3203        assert_eq!(counts.notify_lsp, 1);
3204    }
3205
3206    #[test]
3207    fn verify_step_effect_parity_fails_when_tools_without_step_anchor() {
3208        use crate::engine::kernel_event::PolicyDecision;
3209
3210        let events = vec![
3211            KernelEvent::TurnStarted {
3212                turn_id: "t".into(),
3213                mode: TurnLoopMode::Agent,
3214                input_text: String::new(),
3215                max_steps: 8,
3216            },
3217            KernelEvent::ToolCallPlanned {
3218                turn_id: "t".into(),
3219                step_idx: 1,
3220                call_id: "c1".into(),
3221                tool_name: "read_file".into(),
3222                input_json: "{}".into(),
3223                decision: PolicyDecision::default(),
3224            },
3225        ];
3226        let err = verify_step_effect_parity(&events, 1, 1).expect("parity error");
3227        assert!(err.contains("no ModelRequestIssued anchor"));
3228        assert!(verify_step_effect_parity(&events, 99, 0).is_none());
3229    }
3230
3231    #[test]
3232    fn events_for_step_and_parity_on_pure_read_fixture() {
3233        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3234            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3235        let raw = std::fs::read_to_string(&path).expect("read fixture");
3236        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3237        let step_events = events_for_step(&events, 1);
3238        assert_eq!(step_events.len(), 5);
3239        assert!(verify_step_effect_parity(&events, 1, 1).is_none());
3240        let planned = plan_v3_step_effects(8192, &["call-read-1".into()]);
3241        assert_eq!(planned.len(), 2);
3242    }
3243
3244    #[test]
3245    fn kernel_resume_hints_from_latest_projection() {
3246        let events = vec![
3247            KernelEvent::TurnStarted {
3248                turn_id: "t-resume".into(),
3249                mode: TurnLoopMode::Agent,
3250                input_text: "hi".into(),
3251                max_steps: 12,
3252            },
3253            KernelEvent::TurnEnded {
3254                turn_id: "t-resume".into(),
3255                outcome: TurnOutcome::Completed,
3256                total_steps: 3,
3257            },
3258        ];
3259        let proj = TurnKernelProjection::from_events(&events);
3260        let hints = kernel_resume_hints_from_projection(&proj);
3261        assert_eq!(hints.latest_turn_id.as_deref(), Some("t-resume"));
3262        assert_eq!(hints.max_steps, 12);
3263    }
3264
3265    #[test]
3266    fn replay_thread_message_stats_on_pure_read_fixture() {
3267        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3268            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3269        let raw = std::fs::read_to_string(&path).expect("read fixture");
3270        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3271        let stats = replay_thread_message_stats(&[("t1".into(), events)]);
3272        assert_eq!(stats.model_message_count, 1);
3273        assert_eq!(stats.tool_call_planned_count, 1);
3274        assert_eq!(stats.model_request_count, 1);
3275    }
3276
3277    #[test]
3278    fn verify_session_message_coverage_allows_equal_or_greater() {
3279        let stats = ThreadMessageReplayStats {
3280            turns_with_events: 1,
3281            model_message_count: 2,
3282            ..Default::default()
3283        };
3284        assert!(verify_session_message_coverage(3, &stats).is_none());
3285        assert!(verify_session_message_coverage(1, &stats).is_some());
3286        let cov = build_session_message_coverage(1, &stats).expect("coverage");
3287        assert!(!cov.coverage_ok);
3288        assert!(cov.summary.is_some());
3289        let ok = build_session_message_coverage(3, &stats).expect("coverage");
3290        assert!(ok.coverage_ok);
3291        assert!(ok.summary.is_none());
3292    }
3293
3294    #[test]
3295    fn replay_thread_message_timeline_on_pure_read_fixture() {
3296        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3297            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3298        let raw = std::fs::read_to_string(&path).expect("read fixture");
3299        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3300        let timeline = replay_thread_message_timeline(&[("t1".into(), events)]);
3301        assert_eq!(timeline.len(), 1);
3302        assert_eq!(timeline[0].step_idx, 1);
3303        assert_eq!(timeline[0].block_count, 2);
3304    }
3305
3306    #[test]
3307    fn verify_message_timeline_coherence_on_pure_read_fixture() {
3308        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3309            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3310        let raw = std::fs::read_to_string(&path).expect("read fixture");
3311        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3312        let turn_events = [("t1".into(), events)];
3313        let stats = replay_thread_message_stats(&turn_events);
3314        let timeline = replay_thread_message_timeline(&turn_events);
3315        assert!(verify_message_timeline_coherence(&stats, &timeline).is_none());
3316        assert!(verify_message_timeline_vs_session(2, &timeline).is_none());
3317        assert!(verify_message_timeline_vs_session(0, &timeline).is_some());
3318        assert!(verify_timeline_vs_request_count(&stats, &timeline).is_none());
3319    }
3320
3321    #[test]
3322    fn build_session_message_timeline_coverage_on_pure_read_fixture() {
3323        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3324            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3325        let raw = std::fs::read_to_string(&path).expect("read fixture");
3326        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3327        let projection = replay_thread_projection("t1", &[("t1".into(), events.clone())]);
3328        let cov = build_session_message_timeline_coverage(3, &projection, None, None, None, None)
3329            .expect("coverage");
3330        assert!(cov.overall_ok);
3331        assert!(cov.plane_depth_ok);
3332        assert_eq!(cov.estimated_min_session_messages, 2);
3333        assert_eq!(
3334            projection
3335                .message_plane_index
3336                .estimated_min_session_messages,
3337            2
3338        );
3339        assert!(verify_step_model_message_anchor(&events, 1).is_none());
3340    }
3341
3342    #[test]
3343    fn build_session_message_timeline_coverage_transcript_preview_on_resume_fixture() {
3344        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3345            .join("../../fixtures/harness/kernel-v3-replay/resume_thread_parity.json");
3346        let raw = std::fs::read_to_string(&path).expect("read fixture");
3347        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3348        let projection = replay_thread_projection(
3349            "thread-resume",
3350            &[("golden-resume-parity-001".into(), events.clone())],
3351        );
3352        assert_eq!(projection.transcript_preview_index.preview_row_count, 4);
3353        assert_eq!(
3354            projection.transcript_preview_index.preview_body_event_count,
3355            2
3356        );
3357        let role_index = SessionMessageRoleIndex {
3358            user_message_count: 2,
3359            assistant_message_count: 1,
3360            tool_result_message_count: 1,
3361            text_user_message_count: 2,
3362            total_message_count: 4,
3363        };
3364        let cov = build_session_message_timeline_coverage(
3365            4,
3366            &projection,
3367            Some(&role_index),
3368            None,
3369            None,
3370            None,
3371        )
3372        .expect("coverage");
3373        assert!(cov.transcript_preview_ok);
3374        assert!(cov.transcript_preview_body_ok);
3375        assert_eq!(cov.kernel_transcript_preview_row_count, 4);
3376        assert!(cov.overall_ok);
3377
3378        let preview_messages =
3379            crate::engine::turn_loop::message_body_rebuild_policy::rebuild_preview_messages_from_thread_events(
3380                &[("golden-resume-parity-001".into(), events.clone())],
3381            );
3382        let cov_with_bodies = build_session_message_timeline_coverage(
3383            4,
3384            &projection,
3385            Some(&role_index),
3386            None,
3387            Some(&preview_messages),
3388            Some(&[("golden-resume-parity-001".into(), events)]),
3389        )
3390        .expect("coverage");
3391        assert!(cov_with_bodies.transcript_preview_body_ok);
3392    }
3393
3394    #[test]
3395    fn verify_session_message_plane_depth_on_pure_read_fixture() {
3396        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3397            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3398        let raw = std::fs::read_to_string(&path).expect("read fixture");
3399        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3400        let projection = replay_thread_projection("t1", &[("t1".into(), events)]);
3401        assert!(verify_session_message_plane_depth(2, &projection.message_plane_index).is_none());
3402        assert!(verify_session_message_plane_depth(1, &projection.message_plane_index).is_some());
3403    }
3404
3405    #[test]
3406    fn verify_session_role_index_on_pure_read_fixture() {
3407        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3408            .join("../../fixtures/harness/kernel-v3-replay/pure_read.json");
3409        let raw = std::fs::read_to_string(&path).expect("read fixture");
3410        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3411        let projection = replay_thread_projection("t1", &[("t1".into(), events)]);
3412        let kernel = replay_kernel_message_role_estimate(&projection.message_stats);
3413        let role_index = SessionMessageRoleIndex {
3414            user_message_count: 2,
3415            assistant_message_count: 1,
3416            tool_result_message_count: 1,
3417            text_user_message_count: 1,
3418            total_message_count: 3,
3419        };
3420        assert!(verify_session_role_index(&role_index, &kernel).is_none());
3421        let thin = SessionMessageRoleIndex {
3422            assistant_message_count: 0,
3423            tool_result_message_count: 0,
3424            text_user_message_count: 0,
3425            ..role_index
3426        };
3427        assert!(verify_session_role_index(&thin, &kernel).is_some());
3428        let cov = build_session_message_timeline_coverage(
3429            3,
3430            &projection,
3431            Some(&role_index),
3432            None,
3433            None,
3434            None,
3435        )
3436        .expect("coverage");
3437        assert!(cov.role_index_ok);
3438        assert!(cov.overall_ok);
3439    }
3440
3441    #[test]
3442    fn build_session_message_role_index_counts_tool_results() {
3443        use crate::chat::{ContentBlock, Message};
3444        let messages = [
3445            Message {
3446                role: "user".to_string(),
3447                content: vec![ContentBlock::Text {
3448                    text: "hi".into(),
3449                    cache_control: None,
3450                }],
3451            },
3452            Message {
3453                role: "assistant".to_string(),
3454                content: vec![ContentBlock::Text {
3455                    text: "ok".into(),
3456                    cache_control: None,
3457                }],
3458            },
3459            Message {
3460                role: "user".to_string(),
3461                content: vec![ContentBlock::ToolResult {
3462                    tool_use_id: "t1".into(),
3463                    content: "done".into(),
3464                    is_error: None,
3465                    content_blocks: None,
3466                }],
3467            },
3468        ];
3469        let idx = build_session_message_role_index(&messages);
3470        assert_eq!(idx.user_message_count, 2);
3471        assert_eq!(idx.assistant_message_count, 1);
3472        assert_eq!(idx.tool_result_message_count, 1);
3473        assert_eq!(idx.text_user_message_count, 1);
3474        assert_eq!(idx.total_message_count, 3);
3475    }
3476
3477    #[test]
3478    fn verify_thread_compaction_replay_anchors_on_manual_compaction_fixture() {
3479        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3480            .join("../../fixtures/harness/kernel-v3-replay/manual_compaction.json");
3481        let raw = std::fs::read_to_string(&path).expect("read fixture");
3482        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3483        let turn_events = [("t1".into(), events.clone())];
3484        assert!(verify_thread_compaction_replay_anchors(&turn_events).is_none());
3485        let projection = replay_thread_projection("t1", &turn_events);
3486        assert!(projection.compaction_replay_anchor_ok);
3487        assert_eq!(compaction_run_effects_from_events(&events).len(), 1);
3488    }
3489
3490    #[test]
3491    fn verify_step_compaction_replay_anchor_on_capacity_checkpoint_fixture() {
3492        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3493            .join("../../fixtures/harness/kernel-v3-replay/capacity_checkpoint.json");
3494        let raw = std::fs::read_to_string(&path).expect("read fixture");
3495        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3496        assert!(verify_step_compaction_replay_anchor(&events, 1).is_none());
3497    }
3498
3499    #[test]
3500    fn verify_thread_compaction_replay_anchors_on_capacity_checkpoint_fixture() {
3501        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3502            .join("../../fixtures/harness/kernel-v3-replay/capacity_checkpoint.json");
3503        let raw = std::fs::read_to_string(&path).expect("read fixture");
3504        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3505        let turn_events = [("t1".into(), events.clone())];
3506        assert!(verify_thread_compaction_replay_anchors(&turn_events).is_none());
3507        assert_eq!(compaction_run_effects_from_events(&events).len(), 1);
3508    }
3509
3510    #[test]
3511    fn verify_step_memory_plane_and_compaction_replay_anchors_on_scratchpad_fixture() {
3512        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3513            .join("../../fixtures/harness/kernel-v3-replay/scratchpad_compaction.json");
3514        let raw = std::fs::read_to_string(&path).expect("read fixture");
3515        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3516        assert!(verify_step_memory_plane_replay_anchor(&events, 1).is_none());
3517        assert!(verify_step_compaction_replay_anchor(&events, 1).is_none());
3518    }
3519
3520    #[test]
3521    fn verify_thread_memory_plane_replay_anchors_on_scratchpad_fixture() {
3522        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3523            .join("../../fixtures/harness/kernel-v3-replay/scratchpad_compaction.json");
3524        let raw = std::fs::read_to_string(&path).expect("read fixture");
3525        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3526        let turn_events = [("t1".into(), events.clone())];
3527        assert!(verify_thread_memory_plane_replay_anchors(&turn_events).is_none());
3528        let projection = replay_thread_projection("t1", &turn_events);
3529        assert!(projection.memory_plane_replay_anchor_ok);
3530        let replay_effects = memory_plane_inject_steer_effects_from_events(&events);
3531        assert_eq!(replay_effects.len(), 3);
3532    }
3533
3534    #[test]
3535    fn verify_session_memory_plane_user_depth_on_scratchpad_fixture() {
3536        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3537            .join("../../fixtures/harness/kernel-v3-replay/scratchpad_compaction.json");
3538        let raw = std::fs::read_to_string(&path).expect("read fixture");
3539        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3540        let projection = replay_thread_projection("t1", &[("t1".into(), events)]);
3541        let memory = replay_kernel_memory_plane_user_estimate(&projection.message_stats);
3542        assert_eq!(memory.min_scratchpad_reminder_user_messages, 1);
3543        assert_eq!(memory.min_scratchpad_summary_user_messages, 1);
3544        assert_eq!(memory.min_memory_injected_user_messages, 2);
3545        let role_index = SessionMessageRoleIndex {
3546            user_message_count: 3,
3547            assistant_message_count: 0,
3548            tool_result_message_count: 0,
3549            text_user_message_count: 3,
3550            total_message_count: 3,
3551        };
3552        assert!(verify_session_memory_plane_user_depth(&role_index, &memory).is_none());
3553        let thin = SessionMessageRoleIndex {
3554            text_user_message_count: 1,
3555            ..role_index
3556        };
3557        assert!(verify_session_memory_plane_user_depth(&thin, &memory).is_some());
3558        let cov = build_session_message_timeline_coverage(
3559            3,
3560            &projection,
3561            Some(&role_index),
3562            None,
3563            None,
3564            None,
3565        )
3566        .expect("coverage");
3567        assert!(cov.memory_plane_user_ok);
3568    }
3569
3570    #[test]
3571    fn replay_thread_compaction_timeline_on_manual_compaction_fixture() {
3572        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3573            .join("../../fixtures/harness/kernel-v3-replay/manual_compaction.json");
3574        let raw = std::fs::read_to_string(&path).expect("read fixture");
3575        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3576        let timeline = replay_thread_compaction_timeline(&[("t1".into(), events)]);
3577        assert_eq!(timeline.len(), 1);
3578        assert_eq!(timeline[0].replaced_from, 4);
3579        assert_eq!(timeline[0].replaced_to, 19);
3580        assert_eq!(timeline[0].messages_removed_count, 16);
3581        let index = replay_thread_compaction_index(&timeline);
3582        assert_eq!(index.messages_removed_estimate, 16);
3583        assert_eq!(index.peak_session_depth_hint, 20);
3584    }
3585
3586    #[test]
3587    fn verify_session_compaction_depth_on_manual_compaction_fixture() {
3588        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3589            .join("../../fixtures/harness/kernel-v3-replay/manual_compaction.json");
3590        let raw = std::fs::read_to_string(&path).expect("read fixture");
3591        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3592        let projection = replay_thread_projection("t1", &[("t1".into(), events)]);
3593        assert!(
3594            verify_session_compaction_depth(
3595                5,
3596                &projection.compaction_index,
3597                &projection.message_plane_index
3598            )
3599            .is_none()
3600        );
3601        let stats = ThreadMessageReplayStats {
3602            model_message_count: 10,
3603            tool_call_planned_count: 10,
3604            compaction_artifact_count: 1,
3605            ..Default::default()
3606        };
3607        let plane = replay_thread_message_plane_index(&stats);
3608        let compaction = ThreadCompactionReplayIndex {
3609            artifact_count: 1,
3610            messages_removed_estimate: 10,
3611            peak_session_depth_hint: 20,
3612        };
3613        assert!(verify_session_compaction_depth(0, &compaction, &plane).is_some());
3614        assert!(verify_session_compaction_depth(15, &compaction, &plane).is_none());
3615    }
3616
3617    #[test]
3618    fn replay_thread_message_stats_counts_continuations_on_lht_fixture() {
3619        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3620            .join("../../fixtures/harness/kernel-v3-replay/lht_continue.json");
3621        let raw = std::fs::read_to_string(&path).expect("read fixture");
3622        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3623        let stats = replay_thread_message_stats(&[("t1".into(), events)]);
3624        assert_eq!(stats.step_limit_continuation_count, 1);
3625        assert_eq!(stats.loop_guard_continuation_count, 1);
3626        assert_eq!(stats.steer_injection_count, 1);
3627        let memory = replay_kernel_memory_plane_user_estimate(&stats);
3628        assert_eq!(memory.min_continuation_user_messages, 2);
3629        assert_eq!(memory.min_memory_injected_user_messages, 3);
3630    }
3631
3632    #[test]
3633    fn verify_step_continuation_anchor_on_lht_fixture() {
3634        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3635            .join("../../fixtures/harness/kernel-v3-replay/lht_continue.json");
3636        let raw = std::fs::read_to_string(&path).expect("read fixture");
3637        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3638        assert!(verify_step_continuation_anchor(&events, 20).is_none());
3639        assert!(verify_step_continuation_anchor(&events, 22).is_none());
3640        let step_limit = continuation_inject_steer_effects_for_step(&events, 20);
3641        assert_eq!(step_limit.len(), 1);
3642        assert!(matches!(step_limit[0], Effect::InjectSteer { ref text } if text.is_empty()));
3643        let loop_guard = continuation_inject_steer_effects_for_step(&events, 22);
3644        assert_eq!(loop_guard.len(), 1);
3645    }
3646
3647    #[test]
3648    fn verify_step_capacity_sleep_anchor_on_cooldown_checkpoint() {
3649        use crate::engine::kernel_event::{
3650            CapacityAction, CapacityCheckpointKind, KernelEvent, TurnOutcome,
3651        };
3652        use crate::turn::TurnLoopMode;
3653
3654        let events = vec![
3655            KernelEvent::TurnStarted {
3656                turn_id: "t-cap".into(),
3657                mode: TurnLoopMode::Agent,
3658                input_text: "x".into(),
3659                max_steps: 5,
3660            },
3661            KernelEvent::ModelRequestIssued {
3662                turn_id: "t-cap".into(),
3663                step_idx: 1,
3664                request_fp: crate::engine::request_fingerprint::RequestFingerprint {
3665                    static_prefix_sha256: "a".into(),
3666                    full_prefix_sha256: "b".into(),
3667                },
3668                token_budget: 8192,
3669            },
3670            KernelEvent::CapacityCheckpoint {
3671                turn_id: "t-cap".into(),
3672                step_idx: 1,
3673                kind: CapacityCheckpointKind::PreRequest,
3674                tokens_used: 9000,
3675                token_budget: 12000,
3676                action: CapacityAction::Continue,
3677                cooldown_blocked: true,
3678            },
3679            KernelEvent::TurnEnded {
3680                turn_id: "t-cap".into(),
3681                outcome: TurnOutcome::Completed,
3682                total_steps: 1,
3683            },
3684        ];
3685        assert!(verify_step_capacity_sleep_anchor(&events, 1).is_none());
3686        let sleep_effects: Vec<_> = replay_step_effects(&events, 1)
3687            .into_iter()
3688            .filter(|e| matches!(e, Effect::Sleep { .. }))
3689            .collect();
3690        assert_eq!(sleep_effects.len(), 1);
3691        assert!(matches!(
3692            sleep_effects[0],
3693            Effect::Sleep { millis } if millis == capacity_cooldown_backoff_millis()
3694        ));
3695    }
3696
3697    #[test]
3698    fn verify_step_request_approval_anchor_on_write_batch_fixture() {
3699        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3700            .join("../../fixtures/harness/kernel-v3-replay/write_batch.json");
3701        let raw = std::fs::read_to_string(&path).expect("read fixture");
3702        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3703        assert!(verify_step_request_approval_anchor(&events, 1).is_none());
3704        let projection = replay_thread_projection("t1", &[("t1".into(), events.clone())]);
3705        assert!(projection.request_approval_anchor_ok);
3706        let step_events = events_for_step(&events, 1);
3707        let approval_tail = request_approval_effects_from_step_events(&step_events);
3708        assert_eq!(approval_tail.len(), 1);
3709        assert!(matches!(
3710            approval_tail[0],
3711            Effect::RequestApproval {
3712                ref call_id,
3713                ..
3714            } if call_id == "call-write-1"
3715        ));
3716        let replay_approval: Vec<_> = replay_step_effects(&events, 1)
3717            .into_iter()
3718            .filter(|e| matches!(e, Effect::RequestApproval { .. }))
3719            .collect();
3720        assert_eq!(approval_tail.len(), replay_approval.len());
3721    }
3722
3723    #[test]
3724    fn verify_step_notify_lsp_anchor_on_write_batch_fixture() {
3725        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3726            .join("../../fixtures/harness/kernel-v3-replay/write_batch.json");
3727        let raw = std::fs::read_to_string(&path).expect("read fixture");
3728        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3729        assert!(verify_step_notify_lsp_anchor(&events, 1).is_none());
3730        let projection = replay_thread_projection("t1", &[("t1".into(), events.clone())]);
3731        assert!(projection.notify_lsp_anchor_ok);
3732        let step_events = events_for_step(&events, 1);
3733        let notify_tail = notify_lsp_effects_from_step_events(&step_events);
3734        assert_eq!(notify_tail.len(), 1);
3735        assert!(matches!(
3736            notify_tail[0],
3737            Effect::NotifyLsp { ref tool_name } if tool_name == "edit_file"
3738        ));
3739        let replay_notify: Vec<_> = replay_step_effects(&events, 1)
3740            .into_iter()
3741            .filter(|e| matches!(e, Effect::NotifyLsp { .. }))
3742            .collect();
3743        assert_eq!(notify_tail.len(), replay_notify.len());
3744        for (planned, replayed) in notify_tail.iter().zip(replay_notify.iter()) {
3745            match (planned, replayed) {
3746                (Effect::NotifyLsp { tool_name: a }, Effect::NotifyLsp { tool_name: b }) => {
3747                    assert_eq!(a, b)
3748                }
3749                _ => panic!("notify tail mismatch: {planned:?} vs {replayed:?}"),
3750            }
3751        }
3752    }
3753
3754    #[test]
3755    fn verify_thread_continuation_anchors_on_lht_fixture() {
3756        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3757            .join("../../fixtures/harness/kernel-v3-replay/lht_continue.json");
3758        let raw = std::fs::read_to_string(&path).expect("read fixture");
3759        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3760        let turn_events = [("t1".into(), events)];
3761        assert!(verify_thread_continuation_anchors(&turn_events).is_none());
3762        let projection = replay_thread_projection("t1", &turn_events);
3763        assert!(projection.continuation_anchor_ok);
3764        assert!(projection.continuation_anchor_summary.is_none());
3765    }
3766
3767    #[test]
3768    fn verify_compaction_artifacts_vs_kernel_timeline_on_manual_fixture() {
3769        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3770            .join("../../fixtures/harness/kernel-v3-replay/manual_compaction.json");
3771        let raw = std::fs::read_to_string(&path).expect("read fixture");
3772        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3773        let projection = replay_thread_projection("t1", &[("t1".into(), events)]);
3774        let session = vec![SessionCompactionArtifactEntry {
3775            artifact_id: "art-manual-001".into(),
3776            replaced_start: 4,
3777            replaced_end: 20,
3778            messages_removed_count: 16,
3779            summary_token_count: 512,
3780        }];
3781        assert!(
3782            verify_compaction_artifacts_vs_kernel_timeline(
3783                &projection.compaction_timeline,
3784                &session
3785            )
3786            .is_none()
3787        );
3788        let bad = SessionCompactionArtifactEntry {
3789            artifact_id: "other".into(),
3790            ..session[0].clone()
3791        };
3792        assert!(
3793            verify_compaction_artifacts_vs_kernel_timeline(&projection.compaction_timeline, &[bad])
3794                .is_some()
3795        );
3796    }
3797
3798    #[test]
3799    fn replay_thread_projection_picks_latest_non_empty_turn() {
3800        let good = vec![
3801            KernelEvent::TurnStarted {
3802                turn_id: "t1".into(),
3803                mode: TurnLoopMode::Agent,
3804                input_text: "a".into(),
3805                max_steps: 5,
3806            },
3807            KernelEvent::TurnEnded {
3808                turn_id: "t1".into(),
3809                outcome: TurnOutcome::Completed,
3810                total_steps: 1,
3811            },
3812        ];
3813        let later = vec![
3814            KernelEvent::TurnStarted {
3815                turn_id: "t2".into(),
3816                mode: TurnLoopMode::Agent,
3817                input_text: "b".into(),
3818                max_steps: 8,
3819            },
3820            KernelEvent::StepLimitContinuation {
3821                turn_id: "t2".into(),
3822                step_idx: 8,
3823                lht_objective_injected: true,
3824            },
3825            KernelEvent::TurnEnded {
3826                turn_id: "t2".into(),
3827                outcome: TurnOutcome::Completed,
3828                total_steps: 9,
3829            },
3830        ];
3831        let projection =
3832            replay_thread_projection("thread-x", &[("t1".into(), good), ("t2".into(), later)]);
3833        assert_eq!(projection.latest_turn_id.as_deref(), Some("t2"));
3834        assert_eq!(projection.latest_projection.turn_id, "t2");
3835        assert_eq!(projection.latest_projection.step_limit_continuations, 1);
3836        assert!(projection.report.all_coherent);
3837    }
3838
3839    #[test]
3840    fn kernel_resume_hints_thread_turn_ids_from_projection() {
3841        let good = vec![
3842            KernelEvent::TurnStarted {
3843                turn_id: "t1".into(),
3844                mode: TurnLoopMode::Agent,
3845                input_text: "a".into(),
3846                max_steps: 5,
3847            },
3848            KernelEvent::TurnEnded {
3849                turn_id: "t1".into(),
3850                outcome: TurnOutcome::Completed,
3851                total_steps: 1,
3852            },
3853        ];
3854        let later = vec![
3855            KernelEvent::TurnStarted {
3856                turn_id: "t2".into(),
3857                mode: TurnLoopMode::Agent,
3858                input_text: "b".into(),
3859                max_steps: 8,
3860            },
3861            KernelEvent::TurnEnded {
3862                turn_id: "t2".into(),
3863                outcome: TurnOutcome::Completed,
3864                total_steps: 1,
3865            },
3866        ];
3867        let projection =
3868            replay_thread_projection("thread-x", &[("t1".into(), good), ("t2".into(), later)]);
3869        let hints = kernel_resume_hints_from_thread_projection(&projection);
3870        assert_eq!(hints.latest_turn_id.as_deref(), Some("t2"));
3871        assert_eq!(
3872            hints.thread_turn_ids_with_events,
3873            vec!["t1".to_string(), "t2".to_string()]
3874        );
3875        assert_eq!(hints.expected_anchor_effect_count, 0);
3876    }
3877
3878    #[test]
3879    fn kernel_resume_hints_expected_anchor_count_from_projection() {
3880        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3881            .join("../../fixtures/harness/kernel-v3-replay/scratchpad_compaction.json");
3882        let raw = std::fs::read_to_string(&path).expect("read fixture");
3883        let events: Vec<KernelEvent> = serde_json::from_str(&raw).expect("parse");
3884        let turn_events = [("t1".into(), events)];
3885        let projection = replay_thread_projection("thread-x", &turn_events);
3886        let hints = kernel_resume_hints_from_thread_projection(&projection);
3887        assert_eq!(hints.expected_anchor_effect_count, 4);
3888        assert!(verify_resume_anchor_effect_alignment(4, 4).is_none());
3889        assert!(verify_resume_anchor_effect_alignment(4, 3).is_some());
3890    }
3891
3892    #[test]
3893    fn build_thread_replay_report_skips_empty_and_aggregates_coherence() {
3894        let good = vec![
3895            KernelEvent::TurnStarted {
3896                turn_id: "t-good".into(),
3897                mode: TurnLoopMode::Agent,
3898                input_text: "hi".into(),
3899                max_steps: 5,
3900            },
3901            KernelEvent::TurnEnded {
3902                turn_id: "t-good".into(),
3903                outcome: TurnOutcome::Completed,
3904                total_steps: 1,
3905            },
3906        ];
3907        let bad = vec![KernelEvent::TurnStarted {
3908            turn_id: "t-bad".into(),
3909            mode: TurnLoopMode::Agent,
3910            input_text: "x".into(),
3911            max_steps: 5,
3912        }];
3913        let report = build_thread_replay_report(
3914            "thread-1",
3915            &[
3916                ("t-empty".into(), vec![]),
3917                ("t-good".into(), good),
3918                ("t-bad".into(), bad),
3919            ],
3920        );
3921        assert_eq!(report.turn_count, 3);
3922        assert_eq!(report.turns_with_events, 2);
3923        assert_eq!(report.turns_coherent, 1);
3924        assert!(!report.all_coherent);
3925        assert_eq!(report.turns.len(), 2);
3926    }
3927}