Skip to main content

imp_core/agent/
run_loop.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use futures::StreamExt;
5use imp_llm::{
6    AssistantMessage, ContentBlock, Context, Message, RequestOptions, StopReason, StreamEvent,
7    Usage,
8};
9
10use crate::agent::loop_state::enforce_verification_closeout;
11use crate::agent::{
12    Agent, AgentCommand, AgentEvent, LoopDecision, RecoveryCheckpointKind, RunFinalStatus,
13    StopReason as AgentStopReason, TimingEvent, TimingStage, TurnPhase, TurnState,
14};
15use crate::error::Result;
16use crate::evidence::{
17    EvidenceActions, EvidenceArtifact, EvidencePacket, EvidencePolicy, EvidenceTrustSummary,
18    EvidenceVerificationGate,
19};
20use crate::hooks::HookEvent;
21use crate::ui::NotifyLevel;
22use crate::workflow::{AutonomyMode, VerificationGateRunner};
23use crate::{
24    storage,
25    trace::TraceWriter,
26    trust::{Provenance, RiskLabel, TrustLabel},
27};
28
29use super::{
30    build_assistant_message, clone_model, mana_skill_follow_up_hint, push_stream_text_block,
31    push_stream_thinking_block, record_mana_mutation_results,
32};
33
34impl Agent {
35    pub(super) async fn reconcile_recovery_before_turn(
36        &self,
37        turn: u32,
38    ) -> Option<super::RecoveryReconciliation> {
39        let reconciliation = self
40            .recovery_ledger
41            .lock()
42            .ok()
43            .and_then(|ledger| ledger.reconcile_latest_finished_turn())?;
44
45        // Only a previous turn can block the next turn. The current turn has no
46        // side effects yet, and same-turn reconciliation happens after tool
47        // execution checkpoints are recorded.
48        if reconciliation.turn >= turn {
49            return None;
50        }
51
52        if !reconciliation.is_safe_to_continue() {
53            self.emit(AgentEvent::Error {
54                error: format!(
55                    "Recovery blocked before turn {turn}: {} incomplete non-retryable tool side effect(s)",
56                    reconciliation.unsafe_incomplete_tools.len()
57                ),
58            })
59            .await;
60        }
61
62        Some(reconciliation)
63    }
64
65    async fn run_verification_gates(&mut self, artifacts: &storage::RunArtifacts) {
66        let runner = VerificationGateRunner::new(&self.cwd, artifacts.root().join("verification"));
67        let mut completed = Vec::new();
68        for index in 0..self.verification_gates.len() {
69            if matches!(
70                self.verification_gates[index].status,
71                crate::workflow::VerificationGateStatus::Passed
72                    | crate::workflow::VerificationGateStatus::Failed
73                    | crate::workflow::VerificationGateStatus::Blocked
74                    | crate::workflow::VerificationGateStatus::Skipped
75            ) {
76                continue;
77            }
78            self.emit(AgentEvent::VerificationStarted {
79                gate: self.verification_gates[index].clone(),
80            })
81            .await;
82            let _ = runner.run(&mut self.verification_gates[index]).await;
83            completed.push(self.verification_gates[index].clone());
84        }
85        for gate in completed {
86            self.emit(AgentEvent::VerificationCompleted {
87                closeout_effect: gate.closeout_effect(),
88                gate,
89            })
90            .await;
91        }
92    }
93
94    async fn write_run_evidence(
95        &self,
96        run_id: &str,
97        artifacts: &storage::RunArtifacts,
98        prompt: &str,
99        status: &RunFinalStatus,
100    ) {
101        let mut packet = EvidencePacket::new(run_id, prompt);
102        packet.workflow_id = self
103            .workflow_contract
104            .id
105            .clone()
106            .or_else(|| self.workflow_contract.mana_unit_ref.clone());
107        packet.workflow_type = Some(format!("{:?}", self.workflow_contract.workflow_type));
108        packet.risk_level = Some(format!("{:?}", self.workflow_contract.risk_level));
109        packet.autonomy_mode = Some(self.workflow_contract.autonomy_mode.to_string());
110        packet.final_status = Some(format!("{:?}", status));
111        packet.policy = evidence_policy_for_autonomy(self.workflow_contract.autonomy_mode);
112        packet.trust = evidence_trust_summary_from_messages(&self.messages);
113        packet
114            .summary
115            .push("Agent run completed; inspect trace.jsonl for structured event details.".into());
116        packet.actions = evidence_actions_from_messages(&self.messages);
117        packet.verification = self
118            .verification_gates
119            .iter()
120            .map(evidence_verification_gate)
121            .collect();
122        packet.artifacts = vec![
123            EvidenceArtifact {
124                kind: "trace".into(),
125                path: artifacts.trace_path(),
126                summary: Some("Structured runtime event trace".into()),
127            },
128            EvidenceArtifact {
129                kind: "workflow-contract".into(),
130                path: artifacts.workflow_contract_path(),
131                summary: Some("Workflow contract snapshot".into()),
132            },
133        ];
134        let evidence_path = artifacts.evidence_path();
135        if packet.write_markdown(&evidence_path).is_ok() {
136            self.write_trace_event(&AgentEvent::EvidenceWritten {
137                path: evidence_path.clone(),
138            });
139            let _ = self
140                .event_tx
141                .send(AgentEvent::EvidenceWritten {
142                    path: evidence_path,
143                })
144                .await;
145        }
146    }
147
148    /// Run the agent loop with an initial prompt.
149    pub async fn run(&mut self, prompt: String) -> Result<()> {
150        let trace_path = std::env::var_os("IMP_TUI_TRACE").map(std::path::PathBuf::from);
151        let trace_run = |phase: &str, started: std::time::Instant| {
152            if let Some(path) = trace_path.as_ref() {
153                if let Ok(mut file) = std::fs::OpenOptions::new()
154                    .create(true)
155                    .append(true)
156                    .open(path)
157                {
158                    use std::io::Write as _;
159                    let _ = writeln!(
160                        file,
161                        "{} agent_run_phase phase={} duration_ms={}",
162                        imp_llm::now(),
163                        phase,
164                        started.elapsed().as_millis()
165                    );
166                }
167            }
168        };
169        let phase_started = std::time::Instant::now();
170        let run_id = format!("run_{}", uuid::Uuid::new_v4().simple());
171        let run_artifacts = storage::project_run_artifacts(&self.cwd, &run_id).ok();
172        if let Some(artifacts) = &run_artifacts {
173            if let Ok(writer) = TraceWriter::create(artifacts.trace_path()) {
174                if let Ok(mut active_trace_writer) = self.trace_writer.lock() {
175                    *active_trace_writer = Some(writer);
176                }
177            }
178            let _ = std::fs::write(
179                artifacts.workflow_contract_path(),
180                serde_json::to_string_pretty(&self.workflow_contract).unwrap_or_default(),
181            );
182        }
183        trace_run("artifacts", phase_started);
184        let phase_started = std::time::Instant::now();
185        if let Ok(mut active_run_id) = self.run_id.lock() {
186            *active_run_id = Some(run_id.clone());
187        }
188        trace_run("set_run_id", phase_started);
189        let phase_started = std::time::Instant::now();
190
191        self.emit(AgentEvent::AgentStart {
192            model: self.model.meta.id.clone(),
193            timestamp: imp_llm::now(),
194        })
195        .await;
196        trace_run("emit_agent_start", phase_started);
197        let phase_started = std::time::Instant::now();
198        self.hooks
199            .fire(&HookEvent::OnAgentStart { prompt: &prompt })
200            .await;
201        trace_run("hook_agent_start", phase_started);
202        let phase_started = std::time::Instant::now();
203
204        self.messages.push(Message::user(&prompt));
205
206        self.cancel_token
207            .store(false, std::sync::atomic::Ordering::Relaxed);
208        let mut turn: u32 = 0;
209        let mut total_usage = Usage::default();
210        let mut cancelled = false;
211        let mut final_status: Option<RunFinalStatus> = None;
212        let mut queued_follow_ups: std::collections::VecDeque<String> =
213            std::collections::VecDeque::new();
214        let mut queued_pre_turn_follow_ups: std::collections::VecDeque<String> =
215            std::collections::VecDeque::new();
216        trace_run("init_loop_state", phase_started);
217
218        if let Some(nudge) = mana_skill_follow_up_hint(
219            &prompt,
220            self.mode,
221            !self.tools.is_empty(),
222            self.has_mana_skill,
223            self.has_mana_basics_skill,
224            self.has_mana_delegation_skill,
225        ) {
226            queued_pre_turn_follow_ups.push_back(nudge.to_string());
227        }
228
229        loop {
230            let mut turn_state = TurnState::new(turn);
231            turn_state.enter(TurnPhase::ReceiveCommands);
232
233            if let Some(reconciliation) = self.reconcile_recovery_before_turn(turn).await {
234                if !reconciliation.is_safe_to_continue() {
235                    let unsafe_count = reconciliation.unsafe_incomplete_tools.len();
236                    final_status = Some(RunFinalStatus::Blocked {
237                        reason: AgentStopReason::ExecutionBlocked,
238                        message: format!(
239                            "recovery requires user review: {unsafe_count} incomplete non-retryable tool side effect(s)"
240                        ),
241                    });
242                    break;
243                }
244            }
245
246            if turn > 0 {
247                if let Some(follow_up) = queued_pre_turn_follow_ups.pop_front() {
248                    turn_state.record_continue(super::ContinueReason::QueuedUserFollowUp);
249                    self.messages.push(Message::user(&follow_up));
250                }
251            }
252
253            // Check for commands between turns (non-blocking)
254            while let Ok(cmd) = self.command_rx.try_recv() {
255                match cmd {
256                    AgentCommand::Cancel => {
257                        self.cancel_token
258                            .store(true, std::sync::atomic::Ordering::Relaxed);
259                        cancelled = true;
260                        break;
261                    }
262                    AgentCommand::Steer(msg) => {
263                        self.messages.push(Message::user(&msg));
264                    }
265                    AgentCommand::FollowUp(msg) => queued_follow_ups.push_back(msg),
266                }
267            }
268
269            if cancelled {
270                break;
271            }
272
273            turn_state.enter(TurnPhase::PreTurn);
274            self.emit(AgentEvent::TurnStart { index: turn }).await;
275            if let Ok(mut review) = self.turn_mana_review.lock() {
276                review.begin_turn(turn);
277            }
278            let turn_started_at = Instant::now();
279            turn_state.enter(TurnPhase::BuildContext);
280            self.emit_timing(
281                turn,
282                TimingStage::ContextAssemblyStart,
283                turn_started_at,
284                None,
285            )
286            .await;
287            let context_assembly_started_at = Instant::now();
288
289            let mut usage = crate::context::context_usage(&self.messages, &self.model);
290            if usage.ratio >= self.context_config.observation_mask_threshold {
291                crate::context::mask_observations(
292                    &mut self.messages,
293                    self.context_config.mask_window,
294                );
295                self.hooks
296                    .fire(&HookEvent::OnContextThreshold { ratio: usage.ratio })
297                    .await;
298                // Masking can materially reduce context size, so any subsequent
299                // logic must use fresh usage rather than the pre-masking snapshot.
300                usage = crate::context::context_usage(&self.messages, &self.model);
301            }
302
303            // Context management is observation-mask only. Full conversation
304            // compaction has been removed because the rewrite-based behavior
305            // was too error-prone to keep in the runtime.
306
307            // Build context and options for the LLM
308            let context = Context {
309                messages: self.messages.clone(),
310            };
311
312            let options = RequestOptions {
313                thinking_level: self.thinking_level,
314                // Use configured output cap when present; otherwise let providers
315                // choose their own sensible default output budget.
316                max_tokens: self.max_tokens,
317                temperature: None,
318                system_prompt: self.system_prompt.clone(),
319                tools: self.tools.definitions(),
320                cache_options: self.cache_options.clone(),
321                effort: None,
322            };
323            self.emit_timing_with_details(
324                TimingEvent::new(turn, TimingStage::ContextAssemblyEnd, turn_started_at, None)
325                    .with_duration_ms(context_assembly_started_at.elapsed().as_millis() as u64)
326                    .with_success(true),
327            )
328            .await;
329
330            self.hooks.fire(&HookEvent::BeforeLlmCall).await;
331
332            // Pre-flight OAuth token refresh: if we have an auth store and the
333            // token is expired, refresh it before making the API call. This
334            // avoids wasting a round-trip on a guaranteed 401.
335            if let Some(ref auth_store) = self.auth_store {
336                let mut store = auth_store.lock().await;
337                if store.is_oauth_expired("anthropic") {
338                    match store.resolve_with_refresh("anthropic").await {
339                        Ok(new_key) => {
340                            self.api_key = new_key;
341                        }
342                        Err(e) => {
343                            let message = format!(
344                                "OAuth token refresh failed before request: {e}. Continuing with existing credentials."
345                            );
346                            let _ = self.ui.notify(&message, NotifyLevel::Warning).await;
347                        }
348                    }
349                }
350            }
351
352            // Stream the LLM response with retry on transient startup failures.
353            turn_state.enter(TurnPhase::SampleModel);
354            let llm_request_started_at = Instant::now();
355            self.emit_recovery_checkpoint(Self::recovery_checkpoint(
356                turn,
357                RecoveryCheckpointKind::ProviderRequestStart,
358                None,
359                None,
360                None,
361                None,
362                None,
363            ))
364            .await;
365            self.emit_timing(
366                turn,
367                TimingStage::LlmRequestStart,
368                turn_started_at,
369                Some(llm_request_started_at),
370            )
371            .await;
372            let model = clone_model(&self.model);
373            let context = context.clone();
374            let options = options.clone();
375            let api_key = self.api_key.clone();
376            let mut stream = crate::retry::stream_with_retry(
377                move || {
378                    model
379                        .provider
380                        .stream(&model, context.clone(), options.clone(), &api_key)
381                },
382                self.retry_policy.clone(),
383            );
384
385            let mut ordered_content: Vec<ContentBlock> = Vec::new();
386            let mut tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new();
387            let mut assistant_msg: Option<AssistantMessage> = None;
388            let mut saw_first_stream_event = false;
389            let mut saw_first_text_delta = false;
390            let mut saw_first_tool_call = false;
391            let mut saw_provider_message_end = false;
392            let cancel_token = Arc::clone(&self.cancel_token);
393            cancel_token.store(false, std::sync::atomic::Ordering::Relaxed);
394
395            while let Some(event_result) = stream.next().await {
396                // Check for cancel during event processing
397                while let Ok(cmd) = self.command_rx.try_recv() {
398                    match cmd {
399                        AgentCommand::Cancel => {
400                            cancel_token.store(true, std::sync::atomic::Ordering::Relaxed);
401                            cancelled = true;
402                            break;
403                        }
404                        AgentCommand::Steer(msg) => {
405                            self.messages.push(Message::user(&msg));
406                        }
407                        AgentCommand::FollowUp(msg) => queued_follow_ups.push_back(msg),
408                    }
409                }
410
411                if cancelled {
412                    break;
413                }
414
415                match event_result {
416                    Ok(event) => {
417                        if !saw_first_stream_event {
418                            saw_first_stream_event = true;
419                            self.emit_timing(
420                                turn,
421                                TimingStage::FirstStreamEvent,
422                                turn_started_at,
423                                Some(llm_request_started_at),
424                            )
425                            .await;
426                        }
427                        // Forward as delta
428                        self.emit(AgentEvent::MessageDelta {
429                            delta: event.clone(),
430                        })
431                        .await;
432
433                        match event {
434                            StreamEvent::TextDelta { text } => {
435                                if !saw_first_text_delta {
436                                    saw_first_text_delta = true;
437                                    self.emit_timing(
438                                        turn,
439                                        TimingStage::FirstTextDelta,
440                                        turn_started_at,
441                                        Some(llm_request_started_at),
442                                    )
443                                    .await;
444                                }
445                                push_stream_text_block(&mut ordered_content, text);
446                            }
447                            StreamEvent::ThinkingDelta { text } => {
448                                push_stream_thinking_block(&mut ordered_content, text);
449                            }
450                            StreamEvent::ToolCall {
451                                id,
452                                name,
453                                arguments,
454                            } => {
455                                if !saw_first_tool_call {
456                                    saw_first_tool_call = true;
457                                    self.emit_timing(
458                                        turn,
459                                        TimingStage::FirstToolCall,
460                                        turn_started_at,
461                                        Some(llm_request_started_at),
462                                    )
463                                    .await;
464                                }
465                                let args_hash = Self::tool_args_hash(&arguments);
466                                self.emit_recovery_checkpoint(Self::recovery_checkpoint(
467                                    turn,
468                                    RecoveryCheckpointKind::AssistantToolCallObserved,
469                                    Some(id.clone()),
470                                    Some(name.clone()),
471                                    Some(args_hash),
472                                    None,
473                                    None,
474                                ))
475                                .await;
476                                ordered_content.push(ContentBlock::ToolCall {
477                                    id: id.clone(),
478                                    name: name.clone(),
479                                    arguments: arguments.clone(),
480                                });
481                                tool_calls.push((id, name, arguments));
482                            }
483                            StreamEvent::MessageEnd { message } => {
484                                saw_provider_message_end = true;
485                                self.emit_timing(
486                                    turn,
487                                    TimingStage::MessageEnd,
488                                    turn_started_at,
489                                    Some(llm_request_started_at),
490                                )
491                                .await;
492                                self.emit_recovery_checkpoint(Self::recovery_checkpoint(
493                                    turn,
494                                    RecoveryCheckpointKind::ProviderRequestCompleted,
495                                    None,
496                                    None,
497                                    None,
498                                    Some(true),
499                                    None,
500                                ))
501                                .await;
502                                if let Some(ref usage) = message.usage {
503                                    total_usage.add(usage);
504                                }
505                                assistant_msg = Some(message);
506                            }
507                            StreamEvent::MessageStart { .. } => {}
508                            StreamEvent::Error { error } => {
509                                self.emit(AgentEvent::Error {
510                                    error: format!(
511                                        "Provider stream failed after partial output: {error}"
512                                    ),
513                                })
514                                .await;
515                                // Build a minimal error message to push
516                                let err_msg = AssistantMessage {
517                                    content: vec![ContentBlock::Text { text: error }],
518                                    usage: None,
519                                    stop_reason: StopReason::Error("Stream error".to_string()),
520                                    timestamp: imp_llm::now(),
521                                };
522                                self.messages.push(Message::Assistant(err_msg.clone()));
523                                turn_state.enter(TurnPhase::RecordObservations);
524                                let mana_review = self.finish_turn_mana_review(turn);
525                                self.emit(AgentEvent::TurnEnd {
526                                    index: turn,
527                                    message: err_msg,
528                                    mana_review,
529                                })
530                                .await;
531                                let cost = total_usage.cost(&self.model.meta.pricing);
532                                self.emit(AgentEvent::AgentEnd {
533                                    usage: total_usage,
534                                    cost,
535                                    status: RunFinalStatus::Failed {
536                                        message: "stream error".to_string(),
537                                    },
538                                })
539                                .await;
540                                return Err(crate::error::Error::Llm(imp_llm::Error::Provider(
541                                    "Stream error".to_string(),
542                                )));
543                            }
544                        }
545                    }
546                    Err(e) => {
547                        let had_partial_output =
548                            !ordered_content.is_empty() || !tool_calls.is_empty();
549                        let error = match &e {
550                            imp_llm::Error::Stream(message) => {
551                                if had_partial_output {
552                                    format!(
553                                        "Provider stream failed after partial output: {message}"
554                                    )
555                                } else {
556                                    format!("Provider stream failed before output: {message}")
557                                }
558                            }
559                            imp_llm::Error::Http(http_error) if http_error.is_decode() => {
560                                if had_partial_output {
561                                    format!(
562                                        "Provider response body decode failed after partial output; not retrying to avoid duplicated tool output: {http_error}"
563                                    )
564                                } else {
565                                    format!(
566                                        "Provider response body decode failed before output after retry attempts were exhausted: {http_error}"
567                                    )
568                                }
569                            }
570                            _ => {
571                                if had_partial_output {
572                                    format!("Provider stream failed after partial output: {e}")
573                                } else {
574                                    e.to_string()
575                                }
576                            }
577                        };
578                        self.emit(AgentEvent::Error {
579                            error: error.clone(),
580                        })
581                        .await;
582                        let cost = total_usage.cost(&self.model.meta.pricing);
583                        self.emit(AgentEvent::AgentEnd {
584                            usage: total_usage,
585                            cost,
586                            status: RunFinalStatus::Failed {
587                                message: error.clone(),
588                            },
589                        })
590                        .await;
591                        return Err(e.into());
592                    }
593                }
594            }
595
596            if cancelled {
597                // Emit TurnEnd with whatever we have so far
598                let partial = assistant_msg.unwrap_or_else(|| {
599                    build_assistant_message(&ordered_content, &tool_calls, None)
600                });
601                self.messages.push(Message::Assistant(partial.clone()));
602                let mana_review = self.finish_turn_mana_review(turn);
603                self.emit(AgentEvent::TurnEnd {
604                    index: turn,
605                    message: partial,
606                    mana_review,
607                })
608                .await;
609                break;
610            }
611
612            // Use the MessageEnd message if provided; otherwise the provider
613            // stream ended without a terminal completion event and should be
614            // treated as an error rather than silently synthesized as success.
615            let msg = match assistant_msg {
616                Some(message) => message,
617                None if !saw_provider_message_end => {
618                    let error = format!(
619                        "Provider stream ended unexpectedly before completing the message (missing terminal completion event after {} content block(s) and {} tool call(s))",
620                        ordered_content.len(),
621                        tool_calls.len()
622                    );
623                    self.emit(AgentEvent::Error {
624                        error: error.clone(),
625                    })
626                    .await;
627                    let cost = total_usage.cost(&self.model.meta.pricing);
628                    self.emit(AgentEvent::AgentEnd {
629                        usage: total_usage,
630                        cost,
631                        status: RunFinalStatus::Failed {
632                            message: error.clone(),
633                        },
634                    })
635                    .await;
636                    return Err(crate::error::Error::Llm(imp_llm::Error::Stream(error)));
637                }
638                None => build_assistant_message(&ordered_content, &tool_calls, None),
639            };
640
641            turn_state.enter(TurnPhase::FinalizeAssistantMessage);
642            self.emit_recovery_checkpoint(Self::recovery_checkpoint(
643                turn,
644                RecoveryCheckpointKind::AssistantMessageFinalized,
645                None,
646                None,
647                None,
648                Some(true),
649                None,
650            ))
651            .await;
652            self.messages.push(Message::Assistant(msg.clone()));
653
654            if tool_calls.is_empty() {
655                // No tool calls — the model is done unless a queued follow-up exists.
656                let mana_review = self.finish_turn_mana_review(turn);
657                self.emit(AgentEvent::TurnEnd {
658                    index: turn,
659                    message: msg.clone(),
660                    mana_review: mana_review.clone(),
661                })
662                .await;
663
664                self.emit_timing(
665                    turn,
666                    TimingStage::PostTurnAssessmentStart,
667                    turn_started_at,
668                    None,
669                )
670                .await;
671                turn_state.enter(TurnPhase::AssessTurn);
672                let assessment_started_at = Instant::now();
673                let assessment = self.assess_post_turn(&msg, &[], false, &mana_review);
674                self.emit_timing_with_details(
675                    TimingEvent::new(
676                        turn,
677                        TimingStage::PostTurnAssessmentEnd,
678                        turn_started_at,
679                        None,
680                    )
681                    .with_duration_ms(assessment_started_at.elapsed().as_millis() as u64)
682                    .with_success(true),
683                )
684                .await;
685                self.emit(AgentEvent::TurnAssessment {
686                    index: turn,
687                    assessment: assessment.debug_view(),
688                })
689                .await;
690                turn_state.enter(TurnPhase::DecideNext);
691                let decision = self.loop_decision_after_turn(&assessment);
692                match decision {
693                    LoopDecision::Continue { prompt, reason } => {
694                        self.mark_continue_reason(reason);
695                        turn_state.record_continue(reason);
696                        queued_follow_ups.push_back(prompt);
697                    }
698                    LoopDecision::Finish { status } => {
699                        final_status = Some(status);
700                    }
701                }
702
703                if let Some(follow_up) = queued_follow_ups.pop_front() {
704                    turn_state.record_continue(super::ContinueReason::QueuedUserFollowUp);
705                    self.messages.push(Message::user(&follow_up));
706                    turn += 1;
707                    continue;
708                }
709                break;
710            }
711
712            // Execute tool calls
713            turn_state.enter(TurnPhase::PlanTools);
714            let tool_plan = self.plan_tools(tool_calls);
715            turn_state.record_tool_plan(tool_plan.len());
716            for call in &tool_plan.calls {
717                self.emit_recovery_checkpoint(Self::recovery_checkpoint(
718                    turn,
719                    RecoveryCheckpointKind::ToolPlanCreated,
720                    Some(call.id.clone()),
721                    Some(call.name.clone()),
722                    Some(Self::tool_args_hash(&call.args)),
723                    Some(call.retry_safe),
724                    None,
725                ))
726                .await;
727            }
728            turn_state.enter(TurnPhase::ExecuteTools);
729            let results = self
730                .execute_tool_plan(turn, turn_started_at, tool_plan, cancel_token)
731                .await;
732            turn_state.record_tool_results(results.len());
733            turn_state.enter(TurnPhase::RecordObservations);
734
735            for result in &results {
736                self.emit_recovery_checkpoint(Self::recovery_checkpoint(
737                    turn,
738                    RecoveryCheckpointKind::ToolResultAddedToContext,
739                    Some(result.tool_call_id.clone()),
740                    Some(result.tool_name.clone()),
741                    None,
742                    Some(!result.is_error),
743                    None,
744                ))
745                .await;
746                self.messages.push(Message::ToolResult(result.clone()));
747            }
748
749            record_mana_mutation_results(&self.turn_mana_review, &results);
750            let mana_review = self.finish_turn_mana_review(turn);
751            self.emit(AgentEvent::TurnEnd {
752                index: turn,
753                message: msg.clone(),
754                mana_review: mana_review.clone(),
755            })
756            .await;
757
758            self.emit_timing(
759                turn,
760                TimingStage::PostTurnAssessmentStart,
761                turn_started_at,
762                None,
763            )
764            .await;
765            turn_state.enter(TurnPhase::AssessTurn);
766            let assessment_started_at = Instant::now();
767            let assessment = self.assess_post_turn(&msg, &results, true, &mana_review);
768            self.emit_timing_with_details(
769                TimingEvent::new(
770                    turn,
771                    TimingStage::PostTurnAssessmentEnd,
772                    turn_started_at,
773                    None,
774                )
775                .with_duration_ms(assessment_started_at.elapsed().as_millis() as u64)
776                .with_success(true),
777            )
778            .await;
779            self.emit(AgentEvent::TurnAssessment {
780                index: turn,
781                assessment: assessment.debug_view(),
782            })
783            .await;
784            turn_state.enter(TurnPhase::DecideNext);
785            let decision = self.loop_decision_after_turn(&assessment);
786            let should_stop_after_tool_turn = matches!(
787                decision,
788                LoopDecision::Finish {
789                    status: RunFinalStatus::Blocked {
790                        reason: AgentStopReason::RepeatedAction,
791                        ..
792                    }
793                }
794            );
795            match decision {
796                LoopDecision::Continue { prompt, reason } => {
797                    self.mark_continue_reason(reason);
798                    turn_state.record_continue(reason);
799                    queued_follow_ups.push_back(prompt);
800                }
801                LoopDecision::Finish { status } => {
802                    final_status = Some(status);
803                }
804            }
805
806            if let Some(follow_up) = queued_follow_ups.pop_front() {
807                self.messages.push(Message::user(&follow_up));
808            }
809
810            if should_stop_after_tool_turn {
811                break;
812            }
813
814            turn_state.record_continue(super::ContinueReason::ToolResultsNeedInterpretation);
815            turn += 1;
816        }
817
818        let mut status = if cancelled {
819            RunFinalStatus::Cancelled
820        } else {
821            final_status.unwrap_or_else(|| {
822                RunFinalStatus::from_stop_reason(AgentStopReason::NoAutomaticFollowUp)
823            })
824        };
825        if !cancelled && !self.verification_gates.is_empty() {
826            if let Some(artifacts) = &run_artifacts {
827                self.run_verification_gates(artifacts).await;
828            }
829            status = enforce_verification_closeout(status, &self.verification_gates);
830        }
831        if let Some(artifacts) = &run_artifacts {
832            self.write_run_evidence(&run_id, artifacts, &prompt, &status)
833                .await;
834        }
835        let cost = total_usage.cost(&self.model.meta.pricing);
836        self.emit(AgentEvent::AgentEnd {
837            usage: total_usage,
838            cost,
839            status: status.clone(),
840        })
841        .await;
842
843        if let Ok(mut active_trace_writer) = self.trace_writer.lock() {
844            *active_trace_writer = None;
845        }
846        if let Ok(mut active_run_id) = self.run_id.lock() {
847            *active_run_id = None;
848        }
849
850        if cancelled {
851            return Err(crate::error::Error::Cancelled);
852        }
853
854        Ok(())
855    }
856}
857
858fn evidence_trust_summary_from_messages(messages: &[Message]) -> EvidenceTrustSummary {
859    let mut summary = EvidenceTrustSummary::default();
860    for message in messages {
861        let Message::ToolResult(result) = message else {
862            continue;
863        };
864        let Some(provenance) = result
865            .details
866            .get("provenance")
867            .and_then(|value| serde_json::from_value::<Provenance>(value.clone()).ok())
868        else {
869            continue;
870        };
871        record_evidence_provenance(&mut summary, &provenance);
872    }
873    summary.sources.sort();
874    summary.sources.dedup();
875    summary.low_trust_influences.sort();
876    summary.low_trust_influences.dedup();
877    summary.warnings.sort();
878    summary.warnings.dedup();
879    summary
880}
881
882fn record_evidence_provenance(summary: &mut EvidenceTrustSummary, provenance: &Provenance) {
883    summary.sources.push(format!(
884        "source={:?}; trust={:?}; origin={}",
885        provenance.source,
886        provenance.trust,
887        provenance.origin.as_deref().unwrap_or("unknown")
888    ));
889    if provenance.is_low_trust() {
890        summary.low_trust_influences.push(format!(
891            "low-trust source observed: {}",
892            provenance.origin.as_deref().unwrap_or("unknown")
893        ));
894    }
895    if provenance.risk.iter().any(|risk| {
896        matches!(
897            risk,
898            RiskLabel::PossiblePromptInjection | RiskLabel::ContainsInstructions
899        )
900    }) {
901        summary.warnings.push(format!(
902            "instruction-like low-trust content observed from {}",
903            provenance.origin.as_deref().unwrap_or("unknown")
904        ));
905    }
906    if provenance.trust == TrustLabel::ExternalUntrusted {
907        summary
908            .warnings
909            .push("external/untrusted content cannot authorize policy or tool escalation".into());
910    }
911}
912
913fn evidence_verification_gate(
914    gate: &crate::workflow::VerificationGate,
915) -> EvidenceVerificationGate {
916    EvidenceVerificationGate {
917        name: if gate.name.is_empty() {
918            gate.id.clone()
919        } else {
920            gate.name.clone()
921        },
922        required: gate.is_required(),
923        status: format!("{:?}", gate.status).to_lowercase(),
924        command: gate.command.as_ref().map(|command| command.command.clone()),
925        exit_code: gate.result.as_ref().and_then(|result| result.exit_code),
926        artifact_path: gate
927            .artifacts
928            .iter()
929            .find(|artifact| artifact.kind == "status")
930            .or_else(|| gate.artifacts.first())
931            .map(|artifact| artifact.path.clone()),
932    }
933}
934
935fn evidence_policy_for_autonomy(mode: AutonomyMode) -> EvidencePolicy {
936    let mut policy = EvidencePolicy::default();
937    policy.decisions.push(format!("autonomy mode: {mode}"));
938    policy
939        .decisions
940        .push("policy.checked trace events record mode, scope, and decision context when policy checks run".into());
941    policy
942        .denials
943        .push("hard-rail bypass: none recorded; dangerous grants are not implemented".into());
944    match mode {
945        AutonomyMode::LocalAuto | AutonomyMode::WorktreeAuto => {
946            policy.decisions.push(
947                "autonomous local actions remain subject to workspace, network, secret, and hard-rail policy".into(),
948            );
949            policy.approvals.push(
950                "network, outside-workspace, destructive, and secret-sensitive actions require approval or are denied".into(),
951            );
952        }
953        AutonomyMode::AllowAllLocal => {
954            policy
955                .decisions
956                .push("allow-all-local remained scoped to local workspace/worktree actions".into());
957            policy.decisions.push(
958                "notable high-risk actions should be inspected in policy.checked trace events"
959                    .into(),
960            );
961            policy.approvals.push(
962                "network, outside-workspace, production, secret, and dangerous-grant actions were not silently allowed".into(),
963            );
964        }
965        AutonomyMode::AllowAll => {
966            policy.decisions.push(
967                "allow-all mode was active; audit evidence and policy.checked trace events remain enabled".into(),
968            );
969            policy.decisions.push(
970                "notable high-risk actions should be inspected in policy.checked trace events"
971                    .into(),
972            );
973            policy.approvals.push(
974                "secret exfiltration, dangerous grants, and unsupported outside-scope mutations were not silently allowed".into(),
975            );
976        }
977        AutonomyMode::Ci => {
978            policy.decisions.push(
979                "ci mode fails closed for prompts/approvals not declared ahead of time".into(),
980            );
981        }
982        AutonomyMode::Suggest | AutonomyMode::Safe => {}
983    }
984    policy
985}
986
987fn evidence_actions_from_messages(messages: &[Message]) -> EvidenceActions {
988    let mut actions = EvidenceActions::default();
989    for message in messages {
990        let Message::Assistant(assistant) = message else {
991            continue;
992        };
993        for block in &assistant.content {
994            let ContentBlock::ToolCall {
995                name, arguments, ..
996            } = block
997            else {
998                continue;
999            };
1000            actions.tools.push(name.clone());
1001            match name.as_str() {
1002                "read" => {
1003                    if let Some(path) = arguments.get("path").and_then(|value| value.as_str()) {
1004                        actions.files_inspected.push(path.to_string());
1005                    }
1006                }
1007                "write" | "edit" => {
1008                    if let Some(path) = arguments.get("path").and_then(|value| value.as_str()) {
1009                        actions.files_changed.push(path.to_string());
1010                    }
1011                }
1012                "bash" => {
1013                    if let Some(command) = arguments.get("command").and_then(|value| value.as_str())
1014                    {
1015                        actions.commands_run.push(command.to_string());
1016                    }
1017                }
1018                "scan" => actions.searches.push(format!("scan {arguments}")),
1019                _ => {}
1020            }
1021        }
1022    }
1023    actions.files_inspected.sort();
1024    actions.files_inspected.dedup();
1025    actions.files_changed.sort();
1026    actions.files_changed.dedup();
1027    actions.commands_run.sort();
1028    actions.commands_run.dedup();
1029    actions.searches.sort();
1030    actions.searches.dedup();
1031    actions.tools.sort();
1032    actions.tools.dedup();
1033    actions
1034}