1use std::sync::Arc;
2use std::time::Instant;
3
4use futures::StreamExt;
5use imp_llm::{
6 AssistantMessage, ContentBlock, Context, Message, RequestOptions, StopReason, StreamEvent,
7 Usage,
8};
9
10use crate::agent::loop_state::enforce_verification_closeout;
11use crate::agent::{
12 Agent, AgentCommand, AgentEvent, LoopDecision, RecoveryCheckpointKind, RunFinalStatus,
13 StopReason as AgentStopReason, TimingEvent, TimingStage, TurnPhase, TurnState,
14};
15use crate::error::Result;
16use crate::evidence::{
17 EvidenceActions, EvidenceArtifact, EvidencePacket, EvidencePolicy, EvidenceTrustSummary,
18 EvidenceVerificationGate,
19};
20use crate::hooks::HookEvent;
21use crate::ui::NotifyLevel;
22use crate::workflow::{AutonomyMode, VerificationGateRunner};
23use crate::{
24 storage,
25 trace::TraceWriter,
26 trust::{Provenance, RiskLabel, TrustLabel},
27};
28
29use super::{
30 build_assistant_message, clone_model, mana_skill_follow_up_hint, push_stream_text_block,
31 push_stream_thinking_block, record_mana_mutation_results,
32};
33
34impl Agent {
35 pub(super) async fn reconcile_recovery_before_turn(
36 &self,
37 turn: u32,
38 ) -> Option<super::RecoveryReconciliation> {
39 let reconciliation = self
40 .recovery_ledger
41 .lock()
42 .ok()
43 .and_then(|ledger| ledger.reconcile_latest_finished_turn())?;
44
45 if reconciliation.turn >= turn {
49 return None;
50 }
51
52 if !reconciliation.is_safe_to_continue() {
53 self.emit(AgentEvent::Error {
54 error: format!(
55 "Recovery blocked before turn {turn}: {} incomplete non-retryable tool side effect(s)",
56 reconciliation.unsafe_incomplete_tools.len()
57 ),
58 })
59 .await;
60 }
61
62 Some(reconciliation)
63 }
64
65 async fn run_verification_gates(&mut self, artifacts: &storage::RunArtifacts) {
66 let runner = VerificationGateRunner::new(&self.cwd, artifacts.root().join("verification"));
67 let mut completed = Vec::new();
68 for index in 0..self.verification_gates.len() {
69 if matches!(
70 self.verification_gates[index].status,
71 crate::workflow::VerificationGateStatus::Passed
72 | crate::workflow::VerificationGateStatus::Failed
73 | crate::workflow::VerificationGateStatus::Blocked
74 | crate::workflow::VerificationGateStatus::Skipped
75 ) {
76 continue;
77 }
78 self.emit(AgentEvent::VerificationStarted {
79 gate: self.verification_gates[index].clone(),
80 })
81 .await;
82 let _ = runner.run(&mut self.verification_gates[index]).await;
83 completed.push(self.verification_gates[index].clone());
84 }
85 for gate in completed {
86 self.emit(AgentEvent::VerificationCompleted {
87 closeout_effect: gate.closeout_effect(),
88 gate,
89 })
90 .await;
91 }
92 }
93
94 async fn write_run_evidence(
95 &self,
96 run_id: &str,
97 artifacts: &storage::RunArtifacts,
98 prompt: &str,
99 status: &RunFinalStatus,
100 ) {
101 let mut packet = EvidencePacket::new(run_id, prompt);
102 packet.workflow_id = self
103 .workflow_contract
104 .id
105 .clone()
106 .or_else(|| self.workflow_contract.mana_unit_ref.clone());
107 packet.workflow_type = Some(format!("{:?}", self.workflow_contract.workflow_type));
108 packet.risk_level = Some(format!("{:?}", self.workflow_contract.risk_level));
109 packet.autonomy_mode = Some(self.workflow_contract.autonomy_mode.to_string());
110 packet.final_status = Some(format!("{:?}", status));
111 packet.policy = evidence_policy_for_autonomy(self.workflow_contract.autonomy_mode);
112 packet.trust = evidence_trust_summary_from_messages(&self.messages);
113 packet
114 .summary
115 .push("Agent run completed; inspect trace.jsonl for structured event details.".into());
116 packet.actions = evidence_actions_from_messages(&self.messages);
117 packet.verification = self
118 .verification_gates
119 .iter()
120 .map(evidence_verification_gate)
121 .collect();
122 packet.artifacts = vec![
123 EvidenceArtifact {
124 kind: "trace".into(),
125 path: artifacts.trace_path(),
126 summary: Some("Structured runtime event trace".into()),
127 },
128 EvidenceArtifact {
129 kind: "workflow-contract".into(),
130 path: artifacts.workflow_contract_path(),
131 summary: Some("Workflow contract snapshot".into()),
132 },
133 ];
134 let evidence_path = artifacts.evidence_path();
135 if packet.write_markdown(&evidence_path).is_ok() {
136 self.write_trace_event(&AgentEvent::EvidenceWritten {
137 path: evidence_path.clone(),
138 });
139 let _ = self
140 .event_tx
141 .send(AgentEvent::EvidenceWritten {
142 path: evidence_path,
143 })
144 .await;
145 }
146 }
147
148 pub async fn run(&mut self, prompt: String) -> Result<()> {
150 let trace_path = std::env::var_os("IMP_TUI_TRACE").map(std::path::PathBuf::from);
151 let trace_run = |phase: &str, started: std::time::Instant| {
152 if let Some(path) = trace_path.as_ref() {
153 if let Ok(mut file) = std::fs::OpenOptions::new()
154 .create(true)
155 .append(true)
156 .open(path)
157 {
158 use std::io::Write as _;
159 let _ = writeln!(
160 file,
161 "{} agent_run_phase phase={} duration_ms={}",
162 imp_llm::now(),
163 phase,
164 started.elapsed().as_millis()
165 );
166 }
167 }
168 };
169 let phase_started = std::time::Instant::now();
170 let run_id = format!("run_{}", uuid::Uuid::new_v4().simple());
171 let run_artifacts = storage::project_run_artifacts(&self.cwd, &run_id).ok();
172 if let Some(artifacts) = &run_artifacts {
173 if let Ok(writer) = TraceWriter::create(artifacts.trace_path()) {
174 if let Ok(mut active_trace_writer) = self.trace_writer.lock() {
175 *active_trace_writer = Some(writer);
176 }
177 }
178 let _ = std::fs::write(
179 artifacts.workflow_contract_path(),
180 serde_json::to_string_pretty(&self.workflow_contract).unwrap_or_default(),
181 );
182 }
183 trace_run("artifacts", phase_started);
184 let phase_started = std::time::Instant::now();
185 if let Ok(mut active_run_id) = self.run_id.lock() {
186 *active_run_id = Some(run_id.clone());
187 }
188 trace_run("set_run_id", phase_started);
189 let phase_started = std::time::Instant::now();
190
191 self.emit(AgentEvent::AgentStart {
192 model: self.model.meta.id.clone(),
193 timestamp: imp_llm::now(),
194 })
195 .await;
196 trace_run("emit_agent_start", phase_started);
197 let phase_started = std::time::Instant::now();
198 self.hooks
199 .fire(&HookEvent::OnAgentStart { prompt: &prompt })
200 .await;
201 trace_run("hook_agent_start", phase_started);
202 let phase_started = std::time::Instant::now();
203
204 self.messages.push(Message::user(&prompt));
205
206 self.cancel_token
207 .store(false, std::sync::atomic::Ordering::Relaxed);
208 let mut turn: u32 = 0;
209 let mut total_usage = Usage::default();
210 let mut cancelled = false;
211 let mut final_status: Option<RunFinalStatus> = None;
212 let mut queued_follow_ups: std::collections::VecDeque<String> =
213 std::collections::VecDeque::new();
214 let mut queued_pre_turn_follow_ups: std::collections::VecDeque<String> =
215 std::collections::VecDeque::new();
216 trace_run("init_loop_state", phase_started);
217
218 if let Some(nudge) = mana_skill_follow_up_hint(
219 &prompt,
220 self.mode,
221 !self.tools.is_empty(),
222 self.has_mana_skill,
223 self.has_mana_basics_skill,
224 self.has_mana_delegation_skill,
225 ) {
226 queued_pre_turn_follow_ups.push_back(nudge.to_string());
227 }
228
229 loop {
230 let mut turn_state = TurnState::new(turn);
231 turn_state.enter(TurnPhase::ReceiveCommands);
232
233 if let Some(reconciliation) = self.reconcile_recovery_before_turn(turn).await {
234 if !reconciliation.is_safe_to_continue() {
235 let unsafe_count = reconciliation.unsafe_incomplete_tools.len();
236 final_status = Some(RunFinalStatus::Blocked {
237 reason: AgentStopReason::ExecutionBlocked,
238 message: format!(
239 "recovery requires user review: {unsafe_count} incomplete non-retryable tool side effect(s)"
240 ),
241 });
242 break;
243 }
244 }
245
246 if turn > 0 {
247 if let Some(follow_up) = queued_pre_turn_follow_ups.pop_front() {
248 turn_state.record_continue(super::ContinueReason::QueuedUserFollowUp);
249 self.messages.push(Message::user(&follow_up));
250 }
251 }
252
253 while let Ok(cmd) = self.command_rx.try_recv() {
255 match cmd {
256 AgentCommand::Cancel => {
257 self.cancel_token
258 .store(true, std::sync::atomic::Ordering::Relaxed);
259 cancelled = true;
260 break;
261 }
262 AgentCommand::Steer(msg) => {
263 self.messages.push(Message::user(&msg));
264 }
265 AgentCommand::FollowUp(msg) => queued_follow_ups.push_back(msg),
266 }
267 }
268
269 if cancelled {
270 break;
271 }
272
273 turn_state.enter(TurnPhase::PreTurn);
274 self.emit(AgentEvent::TurnStart { index: turn }).await;
275 if let Ok(mut review) = self.turn_mana_review.lock() {
276 review.begin_turn(turn);
277 }
278 let turn_started_at = Instant::now();
279 turn_state.enter(TurnPhase::BuildContext);
280 self.emit_timing(
281 turn,
282 TimingStage::ContextAssemblyStart,
283 turn_started_at,
284 None,
285 )
286 .await;
287 let context_assembly_started_at = Instant::now();
288
289 let mut usage = crate::context::context_usage(&self.messages, &self.model);
290 if usage.ratio >= self.context_config.observation_mask_threshold {
291 crate::context::mask_observations(
292 &mut self.messages,
293 self.context_config.mask_window,
294 );
295 self.hooks
296 .fire(&HookEvent::OnContextThreshold { ratio: usage.ratio })
297 .await;
298 usage = crate::context::context_usage(&self.messages, &self.model);
301 }
302
303 let context = Context {
309 messages: self.messages.clone(),
310 };
311
312 let options = RequestOptions {
313 thinking_level: self.thinking_level,
314 max_tokens: self.max_tokens,
317 temperature: None,
318 system_prompt: self.system_prompt.clone(),
319 tools: self.tools.definitions(),
320 cache_options: self.cache_options.clone(),
321 effort: None,
322 };
323 self.emit_timing_with_details(
324 TimingEvent::new(turn, TimingStage::ContextAssemblyEnd, turn_started_at, None)
325 .with_duration_ms(context_assembly_started_at.elapsed().as_millis() as u64)
326 .with_success(true),
327 )
328 .await;
329
330 self.hooks.fire(&HookEvent::BeforeLlmCall).await;
331
332 if let Some(ref auth_store) = self.auth_store {
336 let mut store = auth_store.lock().await;
337 if store.is_oauth_expired("anthropic") {
338 match store.resolve_with_refresh("anthropic").await {
339 Ok(new_key) => {
340 self.api_key = new_key;
341 }
342 Err(e) => {
343 let message = format!(
344 "OAuth token refresh failed before request: {e}. Continuing with existing credentials."
345 );
346 let _ = self.ui.notify(&message, NotifyLevel::Warning).await;
347 }
348 }
349 }
350 }
351
352 turn_state.enter(TurnPhase::SampleModel);
354 let llm_request_started_at = Instant::now();
355 self.emit_recovery_checkpoint(Self::recovery_checkpoint(
356 turn,
357 RecoveryCheckpointKind::ProviderRequestStart,
358 None,
359 None,
360 None,
361 None,
362 None,
363 ))
364 .await;
365 self.emit_timing(
366 turn,
367 TimingStage::LlmRequestStart,
368 turn_started_at,
369 Some(llm_request_started_at),
370 )
371 .await;
372 let model = clone_model(&self.model);
373 let context = context.clone();
374 let options = options.clone();
375 let api_key = self.api_key.clone();
376 let mut stream = crate::retry::stream_with_retry(
377 move || {
378 model
379 .provider
380 .stream(&model, context.clone(), options.clone(), &api_key)
381 },
382 self.retry_policy.clone(),
383 );
384
385 let mut ordered_content: Vec<ContentBlock> = Vec::new();
386 let mut tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new();
387 let mut assistant_msg: Option<AssistantMessage> = None;
388 let mut saw_first_stream_event = false;
389 let mut saw_first_text_delta = false;
390 let mut saw_first_tool_call = false;
391 let mut saw_provider_message_end = false;
392 let cancel_token = Arc::clone(&self.cancel_token);
393 cancel_token.store(false, std::sync::atomic::Ordering::Relaxed);
394
395 while let Some(event_result) = stream.next().await {
396 while let Ok(cmd) = self.command_rx.try_recv() {
398 match cmd {
399 AgentCommand::Cancel => {
400 cancel_token.store(true, std::sync::atomic::Ordering::Relaxed);
401 cancelled = true;
402 break;
403 }
404 AgentCommand::Steer(msg) => {
405 self.messages.push(Message::user(&msg));
406 }
407 AgentCommand::FollowUp(msg) => queued_follow_ups.push_back(msg),
408 }
409 }
410
411 if cancelled {
412 break;
413 }
414
415 match event_result {
416 Ok(event) => {
417 if !saw_first_stream_event {
418 saw_first_stream_event = true;
419 self.emit_timing(
420 turn,
421 TimingStage::FirstStreamEvent,
422 turn_started_at,
423 Some(llm_request_started_at),
424 )
425 .await;
426 }
427 self.emit(AgentEvent::MessageDelta {
429 delta: event.clone(),
430 })
431 .await;
432
433 match event {
434 StreamEvent::TextDelta { text } => {
435 if !saw_first_text_delta {
436 saw_first_text_delta = true;
437 self.emit_timing(
438 turn,
439 TimingStage::FirstTextDelta,
440 turn_started_at,
441 Some(llm_request_started_at),
442 )
443 .await;
444 }
445 push_stream_text_block(&mut ordered_content, text);
446 }
447 StreamEvent::ThinkingDelta { text } => {
448 push_stream_thinking_block(&mut ordered_content, text);
449 }
450 StreamEvent::ToolCall {
451 id,
452 name,
453 arguments,
454 } => {
455 if !saw_first_tool_call {
456 saw_first_tool_call = true;
457 self.emit_timing(
458 turn,
459 TimingStage::FirstToolCall,
460 turn_started_at,
461 Some(llm_request_started_at),
462 )
463 .await;
464 }
465 let args_hash = Self::tool_args_hash(&arguments);
466 self.emit_recovery_checkpoint(Self::recovery_checkpoint(
467 turn,
468 RecoveryCheckpointKind::AssistantToolCallObserved,
469 Some(id.clone()),
470 Some(name.clone()),
471 Some(args_hash),
472 None,
473 None,
474 ))
475 .await;
476 ordered_content.push(ContentBlock::ToolCall {
477 id: id.clone(),
478 name: name.clone(),
479 arguments: arguments.clone(),
480 });
481 tool_calls.push((id, name, arguments));
482 }
483 StreamEvent::MessageEnd { message } => {
484 saw_provider_message_end = true;
485 self.emit_timing(
486 turn,
487 TimingStage::MessageEnd,
488 turn_started_at,
489 Some(llm_request_started_at),
490 )
491 .await;
492 self.emit_recovery_checkpoint(Self::recovery_checkpoint(
493 turn,
494 RecoveryCheckpointKind::ProviderRequestCompleted,
495 None,
496 None,
497 None,
498 Some(true),
499 None,
500 ))
501 .await;
502 if let Some(ref usage) = message.usage {
503 total_usage.add(usage);
504 }
505 assistant_msg = Some(message);
506 }
507 StreamEvent::MessageStart { .. } => {}
508 StreamEvent::Error { error } => {
509 self.emit(AgentEvent::Error {
510 error: format!(
511 "Provider stream failed after partial output: {error}"
512 ),
513 })
514 .await;
515 let err_msg = AssistantMessage {
517 content: vec![ContentBlock::Text { text: error }],
518 usage: None,
519 stop_reason: StopReason::Error("Stream error".to_string()),
520 timestamp: imp_llm::now(),
521 };
522 self.messages.push(Message::Assistant(err_msg.clone()));
523 turn_state.enter(TurnPhase::RecordObservations);
524 let mana_review = self.finish_turn_mana_review(turn);
525 self.emit(AgentEvent::TurnEnd {
526 index: turn,
527 message: err_msg,
528 mana_review,
529 })
530 .await;
531 let cost = total_usage.cost(&self.model.meta.pricing);
532 self.emit(AgentEvent::AgentEnd {
533 usage: total_usage,
534 cost,
535 status: RunFinalStatus::Failed {
536 message: "stream error".to_string(),
537 },
538 })
539 .await;
540 return Err(crate::error::Error::Llm(imp_llm::Error::Provider(
541 "Stream error".to_string(),
542 )));
543 }
544 }
545 }
546 Err(e) => {
547 let had_partial_output =
548 !ordered_content.is_empty() || !tool_calls.is_empty();
549 let error = match &e {
550 imp_llm::Error::Stream(message) => {
551 if had_partial_output {
552 format!(
553 "Provider stream failed after partial output: {message}"
554 )
555 } else {
556 format!("Provider stream failed before output: {message}")
557 }
558 }
559 imp_llm::Error::Http(http_error) if http_error.is_decode() => {
560 if had_partial_output {
561 format!(
562 "Provider response body decode failed after partial output; not retrying to avoid duplicated tool output: {http_error}"
563 )
564 } else {
565 format!(
566 "Provider response body decode failed before output after retry attempts were exhausted: {http_error}"
567 )
568 }
569 }
570 _ => {
571 if had_partial_output {
572 format!("Provider stream failed after partial output: {e}")
573 } else {
574 e.to_string()
575 }
576 }
577 };
578 self.emit(AgentEvent::Error {
579 error: error.clone(),
580 })
581 .await;
582 let cost = total_usage.cost(&self.model.meta.pricing);
583 self.emit(AgentEvent::AgentEnd {
584 usage: total_usage,
585 cost,
586 status: RunFinalStatus::Failed {
587 message: error.clone(),
588 },
589 })
590 .await;
591 return Err(e.into());
592 }
593 }
594 }
595
596 if cancelled {
597 let partial = assistant_msg.unwrap_or_else(|| {
599 build_assistant_message(&ordered_content, &tool_calls, None)
600 });
601 self.messages.push(Message::Assistant(partial.clone()));
602 let mana_review = self.finish_turn_mana_review(turn);
603 self.emit(AgentEvent::TurnEnd {
604 index: turn,
605 message: partial,
606 mana_review,
607 })
608 .await;
609 break;
610 }
611
612 let msg = match assistant_msg {
616 Some(message) => message,
617 None if !saw_provider_message_end => {
618 let error = format!(
619 "Provider stream ended unexpectedly before completing the message (missing terminal completion event after {} content block(s) and {} tool call(s))",
620 ordered_content.len(),
621 tool_calls.len()
622 );
623 self.emit(AgentEvent::Error {
624 error: error.clone(),
625 })
626 .await;
627 let cost = total_usage.cost(&self.model.meta.pricing);
628 self.emit(AgentEvent::AgentEnd {
629 usage: total_usage,
630 cost,
631 status: RunFinalStatus::Failed {
632 message: error.clone(),
633 },
634 })
635 .await;
636 return Err(crate::error::Error::Llm(imp_llm::Error::Stream(error)));
637 }
638 None => build_assistant_message(&ordered_content, &tool_calls, None),
639 };
640
641 turn_state.enter(TurnPhase::FinalizeAssistantMessage);
642 self.emit_recovery_checkpoint(Self::recovery_checkpoint(
643 turn,
644 RecoveryCheckpointKind::AssistantMessageFinalized,
645 None,
646 None,
647 None,
648 Some(true),
649 None,
650 ))
651 .await;
652 self.messages.push(Message::Assistant(msg.clone()));
653
654 if tool_calls.is_empty() {
655 let mana_review = self.finish_turn_mana_review(turn);
657 self.emit(AgentEvent::TurnEnd {
658 index: turn,
659 message: msg.clone(),
660 mana_review: mana_review.clone(),
661 })
662 .await;
663
664 self.emit_timing(
665 turn,
666 TimingStage::PostTurnAssessmentStart,
667 turn_started_at,
668 None,
669 )
670 .await;
671 turn_state.enter(TurnPhase::AssessTurn);
672 let assessment_started_at = Instant::now();
673 let assessment = self.assess_post_turn(&msg, &[], false, &mana_review);
674 self.emit_timing_with_details(
675 TimingEvent::new(
676 turn,
677 TimingStage::PostTurnAssessmentEnd,
678 turn_started_at,
679 None,
680 )
681 .with_duration_ms(assessment_started_at.elapsed().as_millis() as u64)
682 .with_success(true),
683 )
684 .await;
685 self.emit(AgentEvent::TurnAssessment {
686 index: turn,
687 assessment: assessment.debug_view(),
688 })
689 .await;
690 turn_state.enter(TurnPhase::DecideNext);
691 let decision = self.loop_decision_after_turn(&assessment);
692 match decision {
693 LoopDecision::Continue { prompt, reason } => {
694 self.mark_continue_reason(reason);
695 turn_state.record_continue(reason);
696 queued_follow_ups.push_back(prompt);
697 }
698 LoopDecision::Finish { status } => {
699 final_status = Some(status);
700 }
701 }
702
703 if let Some(follow_up) = queued_follow_ups.pop_front() {
704 turn_state.record_continue(super::ContinueReason::QueuedUserFollowUp);
705 self.messages.push(Message::user(&follow_up));
706 turn += 1;
707 continue;
708 }
709 break;
710 }
711
712 turn_state.enter(TurnPhase::PlanTools);
714 let tool_plan = self.plan_tools(tool_calls);
715 turn_state.record_tool_plan(tool_plan.len());
716 for call in &tool_plan.calls {
717 self.emit_recovery_checkpoint(Self::recovery_checkpoint(
718 turn,
719 RecoveryCheckpointKind::ToolPlanCreated,
720 Some(call.id.clone()),
721 Some(call.name.clone()),
722 Some(Self::tool_args_hash(&call.args)),
723 Some(call.retry_safe),
724 None,
725 ))
726 .await;
727 }
728 turn_state.enter(TurnPhase::ExecuteTools);
729 let results = self
730 .execute_tool_plan(turn, turn_started_at, tool_plan, cancel_token)
731 .await;
732 turn_state.record_tool_results(results.len());
733 turn_state.enter(TurnPhase::RecordObservations);
734
735 for result in &results {
736 self.emit_recovery_checkpoint(Self::recovery_checkpoint(
737 turn,
738 RecoveryCheckpointKind::ToolResultAddedToContext,
739 Some(result.tool_call_id.clone()),
740 Some(result.tool_name.clone()),
741 None,
742 Some(!result.is_error),
743 None,
744 ))
745 .await;
746 self.messages.push(Message::ToolResult(result.clone()));
747 }
748
749 record_mana_mutation_results(&self.turn_mana_review, &results);
750 let mana_review = self.finish_turn_mana_review(turn);
751 self.emit(AgentEvent::TurnEnd {
752 index: turn,
753 message: msg.clone(),
754 mana_review: mana_review.clone(),
755 })
756 .await;
757
758 self.emit_timing(
759 turn,
760 TimingStage::PostTurnAssessmentStart,
761 turn_started_at,
762 None,
763 )
764 .await;
765 turn_state.enter(TurnPhase::AssessTurn);
766 let assessment_started_at = Instant::now();
767 let assessment = self.assess_post_turn(&msg, &results, true, &mana_review);
768 self.emit_timing_with_details(
769 TimingEvent::new(
770 turn,
771 TimingStage::PostTurnAssessmentEnd,
772 turn_started_at,
773 None,
774 )
775 .with_duration_ms(assessment_started_at.elapsed().as_millis() as u64)
776 .with_success(true),
777 )
778 .await;
779 self.emit(AgentEvent::TurnAssessment {
780 index: turn,
781 assessment: assessment.debug_view(),
782 })
783 .await;
784 turn_state.enter(TurnPhase::DecideNext);
785 let decision = self.loop_decision_after_turn(&assessment);
786 let should_stop_after_tool_turn = matches!(
787 decision,
788 LoopDecision::Finish {
789 status: RunFinalStatus::Blocked {
790 reason: AgentStopReason::RepeatedAction,
791 ..
792 }
793 }
794 );
795 match decision {
796 LoopDecision::Continue { prompt, reason } => {
797 self.mark_continue_reason(reason);
798 turn_state.record_continue(reason);
799 queued_follow_ups.push_back(prompt);
800 }
801 LoopDecision::Finish { status } => {
802 final_status = Some(status);
803 }
804 }
805
806 if let Some(follow_up) = queued_follow_ups.pop_front() {
807 self.messages.push(Message::user(&follow_up));
808 }
809
810 if should_stop_after_tool_turn {
811 break;
812 }
813
814 turn_state.record_continue(super::ContinueReason::ToolResultsNeedInterpretation);
815 turn += 1;
816 }
817
818 let mut status = if cancelled {
819 RunFinalStatus::Cancelled
820 } else {
821 final_status.unwrap_or_else(|| {
822 RunFinalStatus::from_stop_reason(AgentStopReason::NoAutomaticFollowUp)
823 })
824 };
825 if !cancelled && !self.verification_gates.is_empty() {
826 if let Some(artifacts) = &run_artifacts {
827 self.run_verification_gates(artifacts).await;
828 }
829 status = enforce_verification_closeout(status, &self.verification_gates);
830 }
831 if let Some(artifacts) = &run_artifacts {
832 self.write_run_evidence(&run_id, artifacts, &prompt, &status)
833 .await;
834 }
835 let cost = total_usage.cost(&self.model.meta.pricing);
836 self.emit(AgentEvent::AgentEnd {
837 usage: total_usage,
838 cost,
839 status: status.clone(),
840 })
841 .await;
842
843 if let Ok(mut active_trace_writer) = self.trace_writer.lock() {
844 *active_trace_writer = None;
845 }
846 if let Ok(mut active_run_id) = self.run_id.lock() {
847 *active_run_id = None;
848 }
849
850 if cancelled {
851 return Err(crate::error::Error::Cancelled);
852 }
853
854 Ok(())
855 }
856}
857
858fn evidence_trust_summary_from_messages(messages: &[Message]) -> EvidenceTrustSummary {
859 let mut summary = EvidenceTrustSummary::default();
860 for message in messages {
861 let Message::ToolResult(result) = message else {
862 continue;
863 };
864 let Some(provenance) = result
865 .details
866 .get("provenance")
867 .and_then(|value| serde_json::from_value::<Provenance>(value.clone()).ok())
868 else {
869 continue;
870 };
871 record_evidence_provenance(&mut summary, &provenance);
872 }
873 summary.sources.sort();
874 summary.sources.dedup();
875 summary.low_trust_influences.sort();
876 summary.low_trust_influences.dedup();
877 summary.warnings.sort();
878 summary.warnings.dedup();
879 summary
880}
881
882fn record_evidence_provenance(summary: &mut EvidenceTrustSummary, provenance: &Provenance) {
883 summary.sources.push(format!(
884 "source={:?}; trust={:?}; origin={}",
885 provenance.source,
886 provenance.trust,
887 provenance.origin.as_deref().unwrap_or("unknown")
888 ));
889 if provenance.is_low_trust() {
890 summary.low_trust_influences.push(format!(
891 "low-trust source observed: {}",
892 provenance.origin.as_deref().unwrap_or("unknown")
893 ));
894 }
895 if provenance.risk.iter().any(|risk| {
896 matches!(
897 risk,
898 RiskLabel::PossiblePromptInjection | RiskLabel::ContainsInstructions
899 )
900 }) {
901 summary.warnings.push(format!(
902 "instruction-like low-trust content observed from {}",
903 provenance.origin.as_deref().unwrap_or("unknown")
904 ));
905 }
906 if provenance.trust == TrustLabel::ExternalUntrusted {
907 summary
908 .warnings
909 .push("external/untrusted content cannot authorize policy or tool escalation".into());
910 }
911}
912
913fn evidence_verification_gate(
914 gate: &crate::workflow::VerificationGate,
915) -> EvidenceVerificationGate {
916 EvidenceVerificationGate {
917 name: if gate.name.is_empty() {
918 gate.id.clone()
919 } else {
920 gate.name.clone()
921 },
922 required: gate.is_required(),
923 status: format!("{:?}", gate.status).to_lowercase(),
924 command: gate.command.as_ref().map(|command| command.command.clone()),
925 exit_code: gate.result.as_ref().and_then(|result| result.exit_code),
926 artifact_path: gate
927 .artifacts
928 .iter()
929 .find(|artifact| artifact.kind == "status")
930 .or_else(|| gate.artifacts.first())
931 .map(|artifact| artifact.path.clone()),
932 }
933}
934
935fn evidence_policy_for_autonomy(mode: AutonomyMode) -> EvidencePolicy {
936 let mut policy = EvidencePolicy::default();
937 policy.decisions.push(format!("autonomy mode: {mode}"));
938 policy
939 .decisions
940 .push("policy.checked trace events record mode, scope, and decision context when policy checks run".into());
941 policy
942 .denials
943 .push("hard-rail bypass: none recorded; dangerous grants are not implemented".into());
944 match mode {
945 AutonomyMode::LocalAuto | AutonomyMode::WorktreeAuto => {
946 policy.decisions.push(
947 "autonomous local actions remain subject to workspace, network, secret, and hard-rail policy".into(),
948 );
949 policy.approvals.push(
950 "network, outside-workspace, destructive, and secret-sensitive actions require approval or are denied".into(),
951 );
952 }
953 AutonomyMode::AllowAllLocal => {
954 policy
955 .decisions
956 .push("allow-all-local remained scoped to local workspace/worktree actions".into());
957 policy.decisions.push(
958 "notable high-risk actions should be inspected in policy.checked trace events"
959 .into(),
960 );
961 policy.approvals.push(
962 "network, outside-workspace, production, secret, and dangerous-grant actions were not silently allowed".into(),
963 );
964 }
965 AutonomyMode::AllowAll => {
966 policy.decisions.push(
967 "allow-all mode was active; audit evidence and policy.checked trace events remain enabled".into(),
968 );
969 policy.decisions.push(
970 "notable high-risk actions should be inspected in policy.checked trace events"
971 .into(),
972 );
973 policy.approvals.push(
974 "secret exfiltration, dangerous grants, and unsupported outside-scope mutations were not silently allowed".into(),
975 );
976 }
977 AutonomyMode::Ci => {
978 policy.decisions.push(
979 "ci mode fails closed for prompts/approvals not declared ahead of time".into(),
980 );
981 }
982 AutonomyMode::Suggest | AutonomyMode::Safe => {}
983 }
984 policy
985}
986
987fn evidence_actions_from_messages(messages: &[Message]) -> EvidenceActions {
988 let mut actions = EvidenceActions::default();
989 for message in messages {
990 let Message::Assistant(assistant) = message else {
991 continue;
992 };
993 for block in &assistant.content {
994 let ContentBlock::ToolCall {
995 name, arguments, ..
996 } = block
997 else {
998 continue;
999 };
1000 actions.tools.push(name.clone());
1001 match name.as_str() {
1002 "read" => {
1003 if let Some(path) = arguments.get("path").and_then(|value| value.as_str()) {
1004 actions.files_inspected.push(path.to_string());
1005 }
1006 }
1007 "write" | "edit" => {
1008 if let Some(path) = arguments.get("path").and_then(|value| value.as_str()) {
1009 actions.files_changed.push(path.to_string());
1010 }
1011 }
1012 "bash" => {
1013 if let Some(command) = arguments.get("command").and_then(|value| value.as_str())
1014 {
1015 actions.commands_run.push(command.to_string());
1016 }
1017 }
1018 "scan" => actions.searches.push(format!("scan {arguments}")),
1019 _ => {}
1020 }
1021 }
1022 }
1023 actions.files_inspected.sort();
1024 actions.files_inspected.dedup();
1025 actions.files_changed.sort();
1026 actions.files_changed.dedup();
1027 actions.commands_run.sort();
1028 actions.commands_run.dedup();
1029 actions.searches.sort();
1030 actions.searches.dedup();
1031 actions.tools.sort();
1032 actions.tools.dedup();
1033 actions
1034}