Skip to main content

aios_runtime/
lib.rs

1use std::collections::{HashMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use aios_protocol::{
6    AgentStateVector, ApprovalDecision, ApprovalId, ApprovalPort, ApprovalRequest, ApprovalTicket,
7    BranchId, BranchInfo, BranchMergeResult, BudgetState, CheckpointId, CheckpointManifest,
8    EventKind, EventRecord, EventStorePort, FileProvenance, LoopPhase, ModelCompletionRequest,
9    ModelDirective, ModelProviderPort, ModelRouting, OperatingMode, PolicyGatePort, PolicySet,
10    RiskLevel, RunId, SessionId, SessionManifest, SpanStatus, ToolCall, ToolExecutionReport,
11    ToolExecutionRequest, ToolHarnessPort, ToolOutcome,
12};
13use anyhow::{Context, Result, bail};
14use async_trait::async_trait;
15use blake3::Hasher;
16use chrono::Utc;
17use parking_lot::Mutex;
18use serde::Serialize;
19use serde_json::{Map, Value};
20use sha2::{Digest, Sha256};
21use tokio::fs;
22use tokio::sync::broadcast;
23use tracing::{Instrument, debug, info, instrument, warn};
24
25#[derive(Debug, Clone)]
26pub struct RuntimeConfig {
27    pub root: PathBuf,
28    pub checkpoint_every_ticks: u64,
29    pub circuit_breaker_errors: u32,
30}
31
32impl RuntimeConfig {
33    pub fn new(root: impl Into<PathBuf>) -> Self {
34        Self {
35            root: root.into(),
36            checkpoint_every_ticks: 1,
37            circuit_breaker_errors: 3,
38        }
39    }
40}
41
42#[derive(Debug, Clone)]
43pub struct TickInput {
44    pub objective: String,
45    pub proposed_tool: Option<ToolCall>,
46    /// Optional per-request system prompt (active skill body, liquid prompt).
47    pub system_prompt: Option<String>,
48    /// Tool whitelist from active skill. When set, only these tools are sent to the LLM.
49    pub allowed_tools: Option<Vec<String>>,
50}
51
52#[derive(Debug, Clone)]
53pub struct TickOutput {
54    pub session_id: SessionId,
55    pub mode: OperatingMode,
56    pub state: AgentStateVector,
57    pub events_emitted: u64,
58    pub last_sequence: u64,
59}
60
61#[derive(Clone)]
62pub struct TurnContext {
63    pub session_id: SessionId,
64    pub branch_id: BranchId,
65    pub manifest: SessionManifest,
66    pub input: TickInput,
67    pub state: AgentStateVector,
68    pub pending_approvals: Vec<ApprovalTicket>,
69    pub mode: OperatingMode,
70    pub tool_call_guards: Vec<Arc<dyn ToolCallGuard>>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum ToolCallGuardDecision {
75    Allow,
76    Warn {
77        message: String,
78        repetitions: usize,
79        signature: String,
80    },
81    Block {
82        message: String,
83        repetitions: usize,
84        signature: String,
85    },
86}
87
88#[async_trait]
89pub trait ToolCallGuard: Send + Sync {
90    async fn on_tool_call(
91        &self,
92        ctx: &TurnContext,
93        call: &ToolCall,
94    ) -> Result<ToolCallGuardDecision>;
95}
96
97#[async_trait]
98pub trait TurnMiddleware: Send + Sync {
99    async fn process(&self, ctx: &mut TurnContext, next: TurnNext<'_>) -> Result<TickOutput>;
100}
101
102pub struct TurnNext<'a> {
103    runtime: &'a KernelRuntime,
104    middlewares: &'a [Arc<dyn TurnMiddleware>],
105}
106
107impl<'a> TurnNext<'a> {
108    fn new(runtime: &'a KernelRuntime, middlewares: &'a [Arc<dyn TurnMiddleware>]) -> Self {
109        Self {
110            runtime,
111            middlewares,
112        }
113    }
114
115    pub async fn run(self, ctx: &mut TurnContext) -> Result<TickOutput> {
116        match self.middlewares.split_first() {
117            Some((middleware, remaining)) => {
118                middleware
119                    .process(ctx, TurnNext::new(self.runtime, remaining))
120                    .await
121            }
122            None => self.runtime.execute_turn(ctx).await,
123        }
124    }
125}
126
127#[derive(Debug, Default)]
128pub struct PassthroughTurnMiddleware;
129
130#[async_trait]
131impl TurnMiddleware for PassthroughTurnMiddleware {
132    async fn process(&self, ctx: &mut TurnContext, next: TurnNext<'_>) -> Result<TickOutput> {
133        next.run(ctx).await
134    }
135}
136
137#[derive(Debug, Clone)]
138pub struct LoopDetectionConfig {
139    pub warning_threshold: usize,
140    pub hard_stop_limit: usize,
141    pub window_size: usize,
142}
143
144impl Default for LoopDetectionConfig {
145    fn default() -> Self {
146        Self {
147            warning_threshold: 3,
148            hard_stop_limit: 5,
149            window_size: 20,
150        }
151    }
152}
153
154#[derive(Clone)]
155pub struct LoopDetectionMiddleware {
156    config: LoopDetectionConfig,
157    history_by_session: Arc<Mutex<HashMap<String, VecDeque<String>>>>,
158}
159
160impl LoopDetectionMiddleware {
161    pub fn new(config: LoopDetectionConfig) -> Self {
162        Self {
163            config,
164            history_by_session: Arc::new(Mutex::new(HashMap::new())),
165        }
166    }
167
168    fn record_and_classify(&self, ctx: &TurnContext, call: &ToolCall) -> LoopObservation {
169        let signature = hash_tool_call_signature(call);
170        let session_key = ctx.session_id.as_str().to_owned();
171        let mut history_by_session = self.history_by_session.lock();
172        let history = history_by_session.entry(session_key).or_default();
173
174        let prior_repetitions = history
175            .iter()
176            .rev()
177            .take(self.config.window_size)
178            .take_while(|previous| *previous == &signature)
179            .count();
180        let repetitions = prior_repetitions + 1;
181
182        history.push_back(signature.clone());
183        while history.len() > self.config.window_size {
184            history.pop_front();
185        }
186
187        LoopObservation {
188            signature,
189            repetitions,
190        }
191    }
192}
193
194impl Default for LoopDetectionMiddleware {
195    fn default() -> Self {
196        Self::new(LoopDetectionConfig::default())
197    }
198}
199
200impl std::fmt::Debug for LoopDetectionMiddleware {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        f.debug_struct("LoopDetectionMiddleware")
203            .field("config", &self.config)
204            .finish()
205    }
206}
207
208#[async_trait]
209impl TurnMiddleware for LoopDetectionMiddleware {
210    async fn process(&self, ctx: &mut TurnContext, next: TurnNext<'_>) -> Result<TickOutput> {
211        ctx.tool_call_guards.push(Arc::new(self.clone()));
212        next.run(ctx).await
213    }
214}
215
216#[async_trait]
217impl ToolCallGuard for LoopDetectionMiddleware {
218    async fn on_tool_call(
219        &self,
220        ctx: &TurnContext,
221        call: &ToolCall,
222    ) -> Result<ToolCallGuardDecision> {
223        let observation = self.record_and_classify(ctx, call);
224
225        if observation.repetitions >= self.config.hard_stop_limit {
226            return Ok(ToolCallGuardDecision::Block {
227                message: format!(
228                    "Loop detection stopped repeated tool call `{}` after {} identical turns. Respond with text only and change approach.",
229                    call.tool_name, observation.repetitions
230                ),
231                repetitions: observation.repetitions,
232                signature: observation.signature,
233            });
234        }
235
236        if observation.repetitions >= self.config.warning_threshold {
237            return Ok(ToolCallGuardDecision::Warn {
238                message: format!(
239                    "Loop detection warning: tool call `{}` has repeated {} times in a row. Stop repeating it and explain the next step in text.",
240                    call.tool_name, observation.repetitions
241                ),
242                repetitions: observation.repetitions,
243                signature: observation.signature,
244            });
245        }
246
247        Ok(ToolCallGuardDecision::Allow)
248    }
249}
250
251#[derive(Debug, Clone)]
252struct LoopObservation {
253    signature: String,
254    repetitions: usize,
255}
256
257#[derive(Debug, Clone)]
258struct SessionRuntimeState {
259    manifest: SessionManifest,
260    next_sequence_by_branch: HashMap<BranchId, u64>,
261    branches: HashMap<BranchId, BranchRuntimeState>,
262    tick_count: u64,
263    mode: OperatingMode,
264    state_vector: AgentStateVector,
265}
266
267#[derive(Debug, Clone)]
268struct BranchRuntimeState {
269    parent_branch: Option<BranchId>,
270    fork_sequence: u64,
271    head_sequence: u64,
272    merged_into: Option<BranchId>,
273}
274
275#[derive(Clone)]
276pub struct KernelRuntime {
277    config: RuntimeConfig,
278    event_store: Arc<dyn EventStorePort>,
279    provider: Arc<dyn ModelProviderPort>,
280    tool_harness: Arc<dyn ToolHarnessPort>,
281    approvals: Arc<dyn ApprovalPort>,
282    policy_gate: Arc<dyn PolicyGatePort>,
283    turn_middlewares: Vec<Arc<dyn TurnMiddleware>>,
284    stream: broadcast::Sender<EventRecord>,
285    sessions: Arc<Mutex<HashMap<String, SessionRuntimeState>>>,
286}
287
288impl KernelRuntime {
289    pub fn new(
290        config: RuntimeConfig,
291        event_store: Arc<dyn EventStorePort>,
292        provider: Arc<dyn ModelProviderPort>,
293        tool_harness: Arc<dyn ToolHarnessPort>,
294        approvals: Arc<dyn ApprovalPort>,
295        policy_gate: Arc<dyn PolicyGatePort>,
296    ) -> Self {
297        Self::with_turn_middlewares(
298            config,
299            event_store,
300            provider,
301            tool_harness,
302            approvals,
303            policy_gate,
304            Vec::new(),
305        )
306    }
307
308    pub fn with_turn_middlewares(
309        config: RuntimeConfig,
310        event_store: Arc<dyn EventStorePort>,
311        provider: Arc<dyn ModelProviderPort>,
312        tool_harness: Arc<dyn ToolHarnessPort>,
313        approvals: Arc<dyn ApprovalPort>,
314        policy_gate: Arc<dyn PolicyGatePort>,
315        turn_middlewares: Vec<Arc<dyn TurnMiddleware>>,
316    ) -> Self {
317        let (stream, _) = broadcast::channel(2048);
318        Self {
319            config,
320            event_store,
321            provider,
322            tool_harness,
323            approvals,
324            policy_gate,
325            turn_middlewares,
326            stream,
327            sessions: Arc::new(Mutex::new(HashMap::new())),
328        }
329    }
330
331    #[instrument(skip(self, owner, policy, model_routing))]
332    pub async fn create_session(
333        &self,
334        owner: impl Into<String>,
335        policy: PolicySet,
336        model_routing: ModelRouting,
337    ) -> Result<SessionManifest> {
338        self.create_session_with_id(SessionId::default(), owner, policy, model_routing)
339            .await
340    }
341
342    #[instrument(skip(self, owner, policy, model_routing), fields(session_id = %session_id))]
343    pub async fn create_session_with_id(
344        &self,
345        session_id: SessionId,
346        owner: impl Into<String>,
347        policy: PolicySet,
348        model_routing: ModelRouting,
349    ) -> Result<SessionManifest> {
350        if let Some(existing) = self.sessions.lock().get(session_id.as_str()) {
351            return Ok(existing.manifest.clone());
352        }
353
354        let owner = owner.into();
355        let session_root = self.session_root(&session_id);
356        self.initialize_workspace(session_root.as_path()).await?;
357
358        let manifest = SessionManifest {
359            session_id: session_id.clone(),
360            owner,
361            created_at: Utc::now(),
362            workspace_root: session_root.to_string_lossy().into_owned(),
363            model_routing,
364            policy: serde_json::to_value(&policy).unwrap_or_default(),
365        };
366
367        self.write_pretty_json(session_root.join("manifest.json"), &manifest)
368            .await?;
369
370        let manifest_hash = sha256_json(&manifest)?;
371
372        let main_branch = BranchId::main();
373        let latest_sequence = self
374            .event_store
375            .head(session_id.clone(), main_branch.clone())
376            .await
377            .unwrap_or(0);
378        let mut next_sequence_by_branch = HashMap::new();
379        next_sequence_by_branch.insert(main_branch.clone(), latest_sequence + 1);
380        let mut branches = HashMap::new();
381        branches.insert(
382            main_branch.clone(),
383            BranchRuntimeState {
384                parent_branch: None,
385                fork_sequence: 0,
386                head_sequence: latest_sequence,
387                merged_into: None,
388            },
389        );
390        self.sessions.lock().insert(
391            session_id.as_str().to_owned(),
392            SessionRuntimeState {
393                manifest: manifest.clone(),
394                next_sequence_by_branch,
395                branches,
396                tick_count: 0,
397                mode: OperatingMode::Explore,
398                state_vector: AgentStateVector::default(),
399            },
400        );
401        self.policy_gate
402            .set_policy(session_id.clone(), policy)
403            .await
404            .map_err(|error| anyhow::anyhow!(error.to_string()))?;
405
406        if latest_sequence == 0 {
407            self.append_event(
408                &session_id,
409                &main_branch,
410                EventKind::SessionCreated {
411                    name: manifest_hash.clone(),
412                    config: serde_json::json!({ "manifest_hash": manifest_hash }),
413                },
414            )
415            .await?;
416
417            self.emit_phase(&session_id, &main_branch, LoopPhase::Sleep)
418                .await?;
419
420            info!(
421                session_id = %session_id,
422                workspace_root = %manifest.workspace_root,
423                "session created"
424            );
425        } else {
426            info!(
427                session_id = %session_id,
428                workspace_root = %manifest.workspace_root,
429                latest_sequence,
430                "session attached to existing event stream"
431            );
432        }
433
434        Ok(manifest)
435    }
436
437    pub fn session_exists(&self, session_id: &SessionId) -> bool {
438        self.sessions.lock().contains_key(session_id.as_str())
439    }
440
441    /// List all in-memory sessions with summary metadata.
442    pub fn list_sessions(&self) -> Vec<SessionManifest> {
443        let sessions = self.sessions.lock();
444        sessions
445            .values()
446            .map(|state| state.manifest.clone())
447            .collect()
448    }
449
450    pub fn root_path(&self) -> &Path {
451        &self.config.root
452    }
453
454    pub async fn tick(&self, session_id: &SessionId, input: TickInput) -> Result<TickOutput> {
455        self.tick_on_branch(session_id, &BranchId::main(), input)
456            .await
457    }
458
459    #[instrument(
460        skip(self, input),
461        fields(
462            session_id = %session_id,
463            branch = %branch_id.as_str(),
464            objective_len = input.objective.len(),
465            has_tool = input.proposed_tool.is_some()
466        )
467    )]
468    pub async fn tick_on_branch(
469        &self,
470        session_id: &SessionId,
471        branch_id: &BranchId,
472        input: TickInput,
473    ) -> Result<TickOutput> {
474        let (manifest, state) = {
475            let sessions = self.sessions.lock();
476            let session = sessions
477                .get(session_id.as_str())
478                .with_context(|| format!("session not found: {session_id}"))?;
479            (session.manifest.clone(), session.state_vector.clone())
480        };
481
482        let pending_approvals = self
483            .approvals
484            .list_pending(session_id.clone())
485            .await
486            .unwrap_or_default();
487        let mode = self.estimate_mode(&state, pending_approvals.len());
488
489        let mut ctx = TurnContext {
490            session_id: session_id.clone(),
491            branch_id: branch_id.clone(),
492            manifest,
493            input,
494            state,
495            pending_approvals,
496            mode,
497            tool_call_guards: Vec::new(),
498        };
499
500        TurnNext::new(self, &self.turn_middlewares)
501            .run(&mut ctx)
502            .await
503    }
504
505    async fn execute_turn(&self, ctx: &mut TurnContext) -> Result<TickOutput> {
506        let session_id = &ctx.session_id;
507        let branch_id = &ctx.branch_id;
508        let manifest = &ctx.manifest;
509        let input = &ctx.input;
510        let guard_context_template = TurnContext {
511            session_id: ctx.session_id.clone(),
512            branch_id: ctx.branch_id.clone(),
513            manifest: ctx.manifest.clone(),
514            input: ctx.input.clone(),
515            state: ctx.state.clone(),
516            pending_approvals: ctx.pending_approvals.clone(),
517            mode: ctx.mode,
518            tool_call_guards: ctx.tool_call_guards.clone(),
519        };
520        let state = &mut ctx.state;
521
522        let mut emitted = 0_u64;
523        let mut previous_mode = Some(ctx.mode);
524        let mut _tool_calls_this_tick = 0_u32;
525        let mut _file_mutations_this_tick = 0_u32;
526        let mut mode = ctx.mode;
527
528        emitted += self
529            .emit_phase(session_id, branch_id, LoopPhase::Perceive)
530            .await?;
531
532        // Record initial budget metrics during the Perceive phase.
533        {
534            let metrics = life_vigil::metrics::GenAiMetrics::new("arcan");
535            metrics.record_budget(
536                state.budget.tokens_remaining,
537                state.budget.cost_remaining_usd,
538            );
539        }
540
541        emitted += self
542            .emit_phase(session_id, branch_id, LoopPhase::Deliberate)
543            .await?;
544
545        self.append_event(
546            session_id,
547            branch_id,
548            EventKind::DeliberationProposed {
549                summary: input.objective.clone(),
550                proposed_tool: input.proposed_tool.as_ref().map(|c| c.tool_name.clone()),
551            },
552        )
553        .await?;
554        emitted += 1;
555
556        self.append_event(
557            session_id,
558            branch_id,
559            EventKind::StateEstimated {
560                state: (*state).clone(),
561                mode,
562            },
563        )
564        .await?;
565        emitted += 1;
566        debug!(mode = ?mode, uncertainty = state.uncertainty, "state estimated");
567
568        if matches!(mode, OperatingMode::AskHuman | OperatingMode::Sleep) {
569            emitted += self
570                .finalize_tick(session_id, branch_id, manifest, state, &mode)
571                .await?;
572            ctx.mode = mode;
573            return self
574                .current_tick_output(session_id, branch_id, mode, (*state).clone(), emitted)
575                .await;
576        }
577
578        let run_id = RunId::default();
579        self.append_event(
580            session_id,
581            branch_id,
582            EventKind::RunStarted {
583                provider: "canonical".to_owned(),
584                max_iterations: 1,
585            },
586        )
587        .await?;
588        emitted += 1;
589        self.append_event(session_id, branch_id, EventKind::StepStarted { index: 0 })
590            .await?;
591        emitted += 1;
592
593        // Build conversation history from prior events in this session.
594        // This reconstructs user objectives and assistant responses so the LLM
595        // has multi-turn context.
596        let conversation_history = self.build_conversation_history(session_id, branch_id).await;
597
598        let completion = if let Some(call) = input.proposed_tool.clone() {
599            Ok(aios_protocol::ModelCompletion {
600                provider: "inline-proposed-tool".to_owned(),
601                model: "inline".to_owned(),
602                directives: vec![ModelDirective::ToolCall { call }],
603                stop_reason: aios_protocol::ModelStopReason::ToolCall,
604                usage: None,
605                final_answer: None,
606            })
607        } else {
608            self.provider
609                .complete(ModelCompletionRequest {
610                    session_id: session_id.clone(),
611                    branch_id: branch_id.clone(),
612                    run_id: run_id.clone(),
613                    step_index: 0,
614                    objective: input.objective.clone(),
615                    proposed_tool: None,
616                    system_prompt: input.system_prompt.clone(),
617                    allowed_tools: input.allowed_tools.clone(),
618                    conversation_history,
619                })
620                .await
621                .map_err(|error| anyhow::anyhow!(error.to_string()))
622        };
623
624        match completion {
625            Ok(completion) => {
626                let mut directive_count = 0_usize;
627                for directive in completion.directives {
628                    directive_count += 1;
629                    match directive {
630                        ModelDirective::TextDelta { delta, index } => {
631                            self.append_event(
632                                session_id,
633                                branch_id,
634                                EventKind::TextDelta { delta, index },
635                            )
636                            .await?;
637                            emitted += 1;
638                        }
639                        ModelDirective::Message { role, content } => {
640                            self.append_event(
641                                session_id,
642                                branch_id,
643                                EventKind::Message {
644                                    role,
645                                    content,
646                                    model: Some(completion.model.clone()),
647                                    token_usage: completion.usage,
648                                },
649                            )
650                            .await?;
651                            emitted += 1;
652                        }
653                        ModelDirective::ToolCall { call } => {
654                            let guard_ctx = TurnContext {
655                                session_id: guard_context_template.session_id.clone(),
656                                branch_id: guard_context_template.branch_id.clone(),
657                                manifest: guard_context_template.manifest.clone(),
658                                input: guard_context_template.input.clone(),
659                                state: state.clone(),
660                                pending_approvals: guard_context_template.pending_approvals.clone(),
661                                mode,
662                                tool_call_guards: guard_context_template.tool_call_guards.clone(),
663                            };
664                            if let Some(decision) =
665                                self.evaluate_tool_call_guards(&guard_ctx, &call).await?
666                            {
667                                emitted += self
668                                    .persist_loop_guard_event(
669                                        session_id, branch_id, &call, &decision,
670                                    )
671                                    .await?;
672                                emitted += self
673                                    .emit_guard_message(
674                                        session_id,
675                                        branch_id,
676                                        &decision,
677                                        Some(completion.model.clone()),
678                                    )
679                                    .await?;
680
681                                if matches!(decision, ToolCallGuardDecision::Block { .. }) {
682                                    continue;
683                                }
684                            }
685
686                            emitted += self
687                                .emit_phase(session_id, branch_id, LoopPhase::Gate)
688                                .await?;
689                            self.append_event(
690                                session_id,
691                                branch_id,
692                                EventKind::ToolCallRequested {
693                                    call_id: call.call_id.clone(),
694                                    tool_name: call.tool_name.clone(),
695                                    arguments: call.input.clone(),
696                                    category: None,
697                                },
698                            )
699                            .await?;
700                            emitted += 1;
701
702                            let policy = self
703                                .policy_gate
704                                .evaluate(session_id.clone(), call.requested_capabilities.clone())
705                                .await
706                                .map_err(|error| anyhow::anyhow!(error.to_string()))?;
707
708                            // Track tool calls for per-tick Autonomic limits.
709                            _tool_calls_this_tick += 1;
710                            if !policy.denied.is_empty() {
711                                mode = OperatingMode::Recover;
712                                state.error_streak += 1;
713                                state.uncertainty = (state.uncertainty + 0.15).min(1.0);
714                                state.budget.error_budget_remaining =
715                                    state.budget.error_budget_remaining.saturating_sub(1);
716                                self.append_event(
717                                    session_id,
718                                    branch_id,
719                                    EventKind::ToolCallFailed {
720                                        call_id: call.call_id.clone(),
721                                        tool_name: call.tool_name.clone(),
722                                        error: format!(
723                                            "capabilities denied: {}",
724                                            policy
725                                                .denied
726                                                .iter()
727                                                .map(|capability| capability.as_str())
728                                                .collect::<Vec<_>>()
729                                                .join(",")
730                                        ),
731                                    },
732                                )
733                                .await?;
734                                emitted += 1;
735                                continue;
736                            }
737
738                            if !policy.requires_approval.is_empty() {
739                                mode = OperatingMode::AskHuman;
740                                for capability in policy.requires_approval {
741                                    let ticket = self
742                                        .approvals
743                                        .enqueue(ApprovalRequest {
744                                            session_id: session_id.clone(),
745                                            call_id: call.call_id.clone(),
746                                            tool_name: call.tool_name.clone(),
747                                            capability: capability.clone(),
748                                            reason: format!(
749                                                "approval required for tool {}",
750                                                call.tool_name
751                                            ),
752                                        })
753                                        .await
754                                        .map_err(|error| anyhow::anyhow!(error.to_string()))?;
755                                    self.append_event(
756                                        session_id,
757                                        branch_id,
758                                        EventKind::ApprovalRequested {
759                                            approval_id: ticket.approval_id,
760                                            call_id: call.call_id.clone(),
761                                            tool_name: call.tool_name.clone(),
762                                            arguments: call.input.clone(),
763                                            risk: RiskLevel::Medium,
764                                        },
765                                    )
766                                    .await?;
767                                    emitted += 1;
768                                }
769                                continue;
770                            }
771
772                            emitted += self
773                                .emit_phase(session_id, branch_id, LoopPhase::Execute)
774                                .await?;
775                            let report = self
776                                .tool_harness
777                                .execute(ToolExecutionRequest {
778                                    session_id: session_id.clone(),
779                                    workspace_root: manifest.workspace_root.clone(),
780                                    call: call.clone(),
781                                })
782                                .await
783                                .map_err(|error| anyhow::anyhow!(error.to_string()));
784                            match report {
785                                Ok(report) => {
786                                    emitted += self
787                                        .record_tool_report(
788                                            session_id,
789                                            branch_id,
790                                            manifest,
791                                            &report,
792                                            Some(call.call_id.clone()),
793                                        )
794                                        .await?;
795                                    if let ToolOutcome::Success { output } = &report.outcome
796                                        && output.get("path").is_some()
797                                    {
798                                        _file_mutations_this_tick += 1;
799                                    }
800                                    self.apply_homeostasis_controllers(state, &report);
801                                    let new_mode = self.estimate_mode(state, 0);
802                                    if let Some(prev) = previous_mode
803                                        && prev != new_mode
804                                    {
805                                        self.append_event(
806                                            session_id,
807                                            branch_id,
808                                            EventKind::ModeChanged {
809                                                from: prev,
810                                                to: new_mode,
811                                                reason: format!(
812                                                    "post-tool homeostasis: tool={} exit={}",
813                                                    report.tool_name, report.exit_status
814                                                ),
815                                            },
816                                        )
817                                        .await?;
818                                        emitted += 1;
819                                    }
820                                    mode = new_mode;
821                                    previous_mode = Some(mode);
822                                    info!(
823                                        tool_name = %report.tool_name,
824                                        tool_run_id = %report.tool_run_id,
825                                        exit_status = report.exit_status,
826                                        mode = ?mode,
827                                        tool_calls = _tool_calls_this_tick,
828                                        file_mutations = _file_mutations_this_tick,
829                                        "tool execution completed"
830                                    );
831                                }
832                                Err(error) => {
833                                    state.error_streak += 1;
834                                    state.uncertainty = (state.uncertainty + 0.15).min(1.0);
835                                    state.budget.error_budget_remaining =
836                                        state.budget.error_budget_remaining.saturating_sub(1);
837                                    let new_mode = OperatingMode::Recover;
838                                    if let Some(prev) = previous_mode
839                                        && prev != new_mode
840                                    {
841                                        self.append_event(
842                                            session_id,
843                                            branch_id,
844                                            EventKind::ModeChanged {
845                                                from: prev,
846                                                to: new_mode,
847                                                reason: format!("tool execution error: {error}"),
848                                            },
849                                        )
850                                        .await?;
851                                        emitted += 1;
852                                    }
853                                    mode = new_mode;
854                                    previous_mode = Some(mode);
855                                    warn!(
856                                        error = %error,
857                                        error_streak = state.error_streak,
858                                        "tool execution failed"
859                                    );
860                                    self.append_event(
861                                        session_id,
862                                        branch_id,
863                                        EventKind::ToolCallFailed {
864                                            call_id: call.call_id.clone(),
865                                            tool_name: call.tool_name.clone(),
866                                            error: error.to_string(),
867                                        },
868                                    )
869                                    .await?;
870                                    emitted += 1;
871                                }
872                            }
873                        }
874                    }
875                }
876
877                emitted += self
878                    .emit_phase(session_id, branch_id, LoopPhase::Commit)
879                    .await?;
880
881                self.append_event(
882                    session_id,
883                    branch_id,
884                    EventKind::StepFinished {
885                        index: 0,
886                        stop_reason: model_stop_reason_string(&completion.stop_reason),
887                        directive_count,
888                    },
889                )
890                .await?;
891                emitted += 1;
892
893                self.append_event(
894                    session_id,
895                    branch_id,
896                    EventKind::RunFinished {
897                        reason: model_stop_reason_string(&completion.stop_reason),
898                        total_iterations: 1,
899                        final_answer: completion.final_answer,
900                        usage: completion.usage,
901                    },
902                )
903                .await?;
904                emitted += 1;
905            }
906            Err(error) => {
907                mode = OperatingMode::Recover;
908                state.error_streak += 1;
909                state.uncertainty = (state.uncertainty + 0.15).min(1.0);
910                state.budget.error_budget_remaining =
911                    state.budget.error_budget_remaining.saturating_sub(1);
912                self.append_event(
913                    session_id,
914                    branch_id,
915                    EventKind::RunErrored {
916                        error: error.to_string(),
917                    },
918                )
919                .await?;
920                emitted += 1;
921            }
922        }
923
924        if state.error_streak >= self.config.circuit_breaker_errors {
925            mode = OperatingMode::Recover;
926            warn!(
927                error_streak = state.error_streak,
928                threshold = self.config.circuit_breaker_errors,
929                "circuit breaker tripped"
930            );
931            self.append_event(
932                session_id,
933                branch_id,
934                EventKind::CircuitBreakerTripped {
935                    reason: "error streak exceeded threshold".to_owned(),
936                    error_streak: state.error_streak,
937                },
938            )
939            .await?;
940            emitted += 1;
941        }
942
943        emitted += self
944            .finalize_tick(session_id, branch_id, manifest, state, &mode)
945            .await?;
946        ctx.mode = mode;
947        info!(mode = ?mode, emitted, "tick finalized");
948        self.current_tick_output(session_id, branch_id, mode, (*state).clone(), emitted)
949            .await
950    }
951
952    #[instrument(
953        skip(self),
954        fields(
955            session_id = %session_id,
956            branch = %branch_id.as_str(),
957            from_branch = ?from_branch.as_ref().map(|branch| branch.as_str())
958        )
959    )]
960    pub async fn create_branch(
961        &self,
962        session_id: &SessionId,
963        branch_id: BranchId,
964        from_branch: Option<BranchId>,
965        fork_sequence: Option<u64>,
966    ) -> Result<BranchInfo> {
967        let from_branch = from_branch.unwrap_or_else(BranchId::main);
968        let fork_sequence_value = {
969            let mut sessions = self.sessions.lock();
970            let session = sessions
971                .get_mut(session_id.as_str())
972                .with_context(|| format!("session not found: {session_id}"))?;
973            if session.branches.contains_key(&branch_id) {
974                bail!("branch already exists: {}", branch_id.as_str());
975            }
976            let parent = session
977                .branches
978                .get(&from_branch)
979                .with_context(|| format!("source branch not found: {}", from_branch.as_str()))?;
980            if let Some(target) = &parent.merged_into {
981                bail!(
982                    "source branch {} is merged into {} and is read-only",
983                    from_branch.as_str(),
984                    target.as_str()
985                );
986            }
987            let fork = fork_sequence.unwrap_or(parent.head_sequence);
988            if fork > parent.head_sequence {
989                bail!(
990                    "fork sequence {} exceeds source branch head {}",
991                    fork,
992                    parent.head_sequence
993                );
994            }
995
996            session.next_sequence_by_branch.insert(branch_id.clone(), 1);
997            session.branches.insert(
998                branch_id.clone(),
999                BranchRuntimeState {
1000                    parent_branch: Some(from_branch.clone()),
1001                    fork_sequence: fork,
1002                    head_sequence: 0,
1003                    merged_into: None,
1004                },
1005            );
1006            fork
1007        };
1008
1009        self.append_event(
1010            session_id,
1011            &branch_id,
1012            EventKind::BranchCreated {
1013                new_branch_id: branch_id.clone(),
1014                fork_point_seq: fork_sequence_value,
1015                name: branch_id.as_str().to_owned(),
1016            },
1017        )
1018        .await?;
1019        info!(
1020            branch = %branch_id.as_str(),
1021            from_branch = %from_branch.as_str(),
1022            fork_sequence = fork_sequence_value,
1023            "branch created"
1024        );
1025
1026        self.branch_info(session_id, &branch_id)
1027    }
1028
1029    pub async fn list_branches(&self, session_id: &SessionId) -> Result<Vec<BranchInfo>> {
1030        let sessions = self.sessions.lock();
1031        let session = sessions
1032            .get(session_id.as_str())
1033            .with_context(|| format!("session not found: {session_id}"))?;
1034
1035        let mut branches: Vec<_> = session
1036            .branches
1037            .iter()
1038            .map(|(branch_id, state)| BranchInfo {
1039                branch_id: branch_id.clone(),
1040                parent_branch: state.parent_branch.clone(),
1041                fork_sequence: state.fork_sequence,
1042                head_sequence: state.head_sequence,
1043                merged_into: state.merged_into.clone(),
1044            })
1045            .collect();
1046        branches.sort_by(|a, b| a.branch_id.as_str().cmp(b.branch_id.as_str()));
1047        Ok(branches)
1048    }
1049
1050    #[instrument(
1051        skip(self),
1052        fields(
1053            session_id = %session_id,
1054            source_branch = %source_branch.as_str(),
1055            target_branch = %target_branch.as_str()
1056        )
1057    )]
1058    pub async fn merge_branch(
1059        &self,
1060        session_id: &SessionId,
1061        source_branch: BranchId,
1062        target_branch: BranchId,
1063    ) -> Result<BranchMergeResult> {
1064        if source_branch == target_branch {
1065            bail!("source and target branch must differ");
1066        }
1067        if source_branch == BranchId::main() {
1068            bail!("main branch cannot be used as a merge source");
1069        }
1070
1071        let source_head =
1072            {
1073                let sessions = self.sessions.lock();
1074                let session = sessions
1075                    .get(session_id.as_str())
1076                    .with_context(|| format!("session not found: {session_id}"))?;
1077                let source = session.branches.get(&source_branch).with_context(|| {
1078                    format!("source branch not found: {}", source_branch.as_str())
1079                })?;
1080                if let Some(merged_into) = &source.merged_into {
1081                    bail!(
1082                        "source branch {} already merged into {}",
1083                        source_branch.as_str(),
1084                        merged_into.as_str()
1085                    );
1086                }
1087                let target = session.branches.get(&target_branch).with_context(|| {
1088                    format!("target branch not found: {}", target_branch.as_str())
1089                })?;
1090                if let Some(merged_into) = &target.merged_into {
1091                    bail!(
1092                        "target branch {} is merged into {} and is read-only",
1093                        target_branch.as_str(),
1094                        merged_into.as_str()
1095                    );
1096                }
1097                source.head_sequence
1098            };
1099
1100        self.append_event(
1101            session_id,
1102            &target_branch,
1103            EventKind::BranchMerged {
1104                source_branch_id: source_branch.clone(),
1105                merge_seq: source_head,
1106            },
1107        )
1108        .await?;
1109
1110        let target_head = self.peek_last_sequence(session_id, &target_branch)?;
1111        {
1112            let mut sessions = self.sessions.lock();
1113            let session = sessions
1114                .get_mut(session_id.as_str())
1115                .with_context(|| format!("session not found: {session_id}"))?;
1116            let source = session
1117                .branches
1118                .get_mut(&source_branch)
1119                .with_context(|| format!("source branch not found: {}", source_branch.as_str()))?;
1120            source.merged_into = Some(target_branch.clone());
1121        }
1122        info!(
1123            source_head_sequence = source_head,
1124            target_head_sequence = target_head,
1125            "branch merged"
1126        );
1127
1128        Ok(BranchMergeResult {
1129            source_branch,
1130            target_branch,
1131            source_head_sequence: source_head,
1132            target_head_sequence: target_head,
1133        })
1134    }
1135
1136    pub async fn resolve_approval(
1137        &self,
1138        session_id: &SessionId,
1139        approval_id: uuid::Uuid,
1140        approved: bool,
1141        actor: impl Into<String>,
1142    ) -> Result<()> {
1143        let actor = actor.into();
1144        let resolution = self
1145            .approvals
1146            .resolve(
1147                ApprovalId::from_string(approval_id.to_string()),
1148                approved,
1149                actor.clone(),
1150            )
1151            .await
1152            .map_err(|error| anyhow::anyhow!(error.to_string()))
1153            .with_context(|| format!("approval not pending: {approval_id}"))?;
1154
1155        let decision = if resolution.approved {
1156            ApprovalDecision::Approved
1157        } else {
1158            ApprovalDecision::Denied
1159        };
1160
1161        self.append_event(
1162            session_id,
1163            &BranchId::main(),
1164            EventKind::ApprovalResolved {
1165                approval_id: ApprovalId::from_string(approval_id.to_string()),
1166                decision,
1167                reason: Some(actor),
1168            },
1169        )
1170        .await?;
1171        Ok(())
1172    }
1173
1174    pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<EventRecord> {
1175        self.stream.subscribe()
1176    }
1177
1178    /// Get a clone of the broadcast sender for injecting ephemeral events
1179    /// (e.g., streaming text deltas from the provider).
1180    pub fn event_sender(&self) -> broadcast::Sender<EventRecord> {
1181        self.stream.clone()
1182    }
1183
1184    pub async fn record_external_event(
1185        &self,
1186        session_id: &SessionId,
1187        kind: EventKind,
1188    ) -> Result<()> {
1189        self.record_external_event_on_branch(session_id, &BranchId::main(), kind)
1190            .await
1191    }
1192
1193    #[instrument(
1194        skip(self, kind),
1195        fields(session_id = %session_id, branch = %branch_id.as_str())
1196    )]
1197    pub async fn record_external_event_on_branch(
1198        &self,
1199        session_id: &SessionId,
1200        branch_id: &BranchId,
1201        kind: EventKind,
1202    ) -> Result<()> {
1203        {
1204            let sessions = self.sessions.lock();
1205            if !sessions.contains_key(session_id.as_str()) {
1206                bail!("session not found: {session_id}");
1207            }
1208        }
1209        self.append_event(session_id, branch_id, kind).await
1210    }
1211
1212    pub async fn read_events(
1213        &self,
1214        session_id: &SessionId,
1215        from_sequence: u64,
1216        limit: usize,
1217    ) -> Result<Vec<EventRecord>> {
1218        self.read_events_on_branch(session_id, &BranchId::main(), from_sequence, limit)
1219            .await
1220    }
1221
1222    pub async fn read_events_on_branch(
1223        &self,
1224        session_id: &SessionId,
1225        branch_id: &BranchId,
1226        from_sequence: u64,
1227        limit: usize,
1228    ) -> Result<Vec<EventRecord>> {
1229        self.event_store
1230            .read(session_id.clone(), branch_id.clone(), from_sequence, limit)
1231            .await
1232            .map_err(|error| anyhow::anyhow!(error.to_string()))
1233    }
1234
1235    /// Build conversation history from the session's event journal.
1236    ///
1237    /// Reads prior events and extracts user objectives (from `DeliberationProposed`)
1238    /// and assistant responses (from `Message` with role=assistant, or aggregated
1239    /// `TextDelta` events). Returns a list of `ConversationTurn` entries in
1240    /// chronological order, capped at the most recent 50 turns to avoid
1241    /// context overflow.
1242    async fn build_conversation_history(
1243        &self,
1244        session_id: &SessionId,
1245        branch_id: &BranchId,
1246    ) -> Vec<aios_protocol::ConversationTurn> {
1247        let events = match self
1248            .event_store
1249            .read(session_id.clone(), branch_id.clone(), 0, 10_000)
1250            .await
1251        {
1252            Ok(events) => events,
1253            Err(err) => {
1254                debug!(%err, "failed to read events for conversation history");
1255                return Vec::new();
1256            }
1257        };
1258
1259        let mut turns = Vec::new();
1260        let mut current_assistant_text = String::new();
1261
1262        for record in &events {
1263            match &record.kind {
1264                EventKind::DeliberationProposed { summary, .. } => {
1265                    // Flush any pending assistant text before the next user turn.
1266                    if !current_assistant_text.is_empty() {
1267                        turns.push(aios_protocol::ConversationTurn {
1268                            role: "assistant".to_owned(),
1269                            content: std::mem::take(&mut current_assistant_text),
1270                        });
1271                    }
1272                    if !summary.is_empty() {
1273                        turns.push(aios_protocol::ConversationTurn {
1274                            role: "user".to_owned(),
1275                            content: summary.clone(),
1276                        });
1277                    }
1278                }
1279                EventKind::Message { role, content, .. } if role == "assistant" => {
1280                    current_assistant_text.push_str(content);
1281                }
1282                EventKind::TextDelta { delta, .. } => {
1283                    current_assistant_text.push_str(delta);
1284                }
1285                EventKind::RunFinished { final_answer, .. } => {
1286                    // If we have a final answer and no accumulated text, use it.
1287                    if current_assistant_text.is_empty()
1288                        && let Some(answer) = final_answer
1289                    {
1290                        current_assistant_text = answer.clone();
1291                    }
1292                    // Flush assistant text at run boundary.
1293                    if !current_assistant_text.is_empty() {
1294                        turns.push(aios_protocol::ConversationTurn {
1295                            role: "assistant".to_owned(),
1296                            content: std::mem::take(&mut current_assistant_text),
1297                        });
1298                    }
1299                }
1300                _ => {}
1301            }
1302        }
1303
1304        // Flush any remaining assistant text.
1305        if !current_assistant_text.is_empty() {
1306            turns.push(aios_protocol::ConversationTurn {
1307                role: "assistant".to_owned(),
1308                content: current_assistant_text,
1309            });
1310        }
1311
1312        // Cap to most recent 50 turns to avoid context overflow.
1313        let max_turns = 50;
1314        if turns.len() > max_turns {
1315            turns.drain(..turns.len() - max_turns);
1316        }
1317
1318        turns
1319    }
1320
1321    fn estimate_mode(&self, state: &AgentStateVector, pending_approvals: usize) -> OperatingMode {
1322        if pending_approvals > 0 {
1323            return OperatingMode::AskHuman;
1324        }
1325
1326        if state.error_streak >= self.config.circuit_breaker_errors {
1327            return OperatingMode::Recover;
1328        }
1329
1330        if state.progress >= 0.98 {
1331            return OperatingMode::Sleep;
1332        }
1333
1334        if state.context_pressure > 0.8 || state.uncertainty > 0.65 {
1335            return OperatingMode::Explore;
1336        }
1337
1338        if state.side_effect_pressure > 0.6 {
1339            return OperatingMode::Verify;
1340        }
1341
1342        OperatingMode::Execute
1343    }
1344
1345    fn apply_homeostasis_controllers(
1346        &self,
1347        state: &mut AgentStateVector,
1348        report: &ToolExecutionReport,
1349    ) {
1350        state.budget.tool_calls_remaining = state.budget.tool_calls_remaining.saturating_sub(1);
1351        state.budget.tokens_remaining = state.budget.tokens_remaining.saturating_sub(750);
1352        state.budget.time_remaining_ms = state.budget.time_remaining_ms.saturating_sub(1200);
1353
1354        if report.exit_status == 0 {
1355            state.progress = (state.progress + 0.12).min(1.0);
1356            state.uncertainty = (state.uncertainty * 0.85).max(0.05);
1357            state.error_streak = 0;
1358            state.side_effect_pressure = (state.side_effect_pressure + 0.2).min(1.0);
1359        } else {
1360            state.error_streak += 1;
1361            state.uncertainty = (state.uncertainty + 0.18).min(1.0);
1362            state.budget.error_budget_remaining =
1363                state.budget.error_budget_remaining.saturating_sub(1);
1364            state.side_effect_pressure = (state.side_effect_pressure * 0.5).max(0.1);
1365        }
1366
1367        state.context_pressure = (state.context_pressure + 0.03).min(1.0);
1368        state.human_dependency = if state.error_streak >= 2 { 0.6 } else { 0.0 };
1369
1370        state.risk_level = if state.uncertainty > 0.75 || state.side_effect_pressure > 0.7 {
1371            RiskLevel::High
1372        } else if state.uncertainty > 0.45 || state.side_effect_pressure > 0.4 {
1373            RiskLevel::Medium
1374        } else {
1375            RiskLevel::Low
1376        };
1377    }
1378
1379    async fn finalize_tick(
1380        &self,
1381        session_id: &SessionId,
1382        branch_id: &BranchId,
1383        manifest: &SessionManifest,
1384        state: &mut AgentStateVector,
1385        mode: &OperatingMode,
1386    ) -> Result<u64> {
1387        let mut emitted = 0_u64;
1388
1389        emitted += self
1390            .emit_phase(session_id, branch_id, LoopPhase::Reflect)
1391            .await?;
1392
1393        self.append_event(
1394            session_id,
1395            branch_id,
1396            EventKind::BudgetUpdated {
1397                budget: state.budget.clone(),
1398                reason: "tick accounting".to_owned(),
1399            },
1400        )
1401        .await?;
1402        emitted += 1;
1403
1404        // Record budget metrics via Vigil GenAI metrics.
1405        {
1406            let metrics = life_vigil::metrics::GenAiMetrics::new("arcan");
1407            metrics.record_budget(
1408                state.budget.tokens_remaining,
1409                state.budget.cost_remaining_usd,
1410            );
1411
1412            // Detect and record mode transitions in the Reflect phase.
1413            let previous_mode = {
1414                let sessions = self.sessions.lock();
1415                sessions
1416                    .get(session_id.as_str())
1417                    .map(|s| s.mode)
1418                    .unwrap_or(OperatingMode::Explore)
1419            };
1420            if previous_mode != *mode {
1421                let from_str = operating_mode_str(&previous_mode);
1422                let to_str = operating_mode_str(mode);
1423                metrics.record_mode_transition(from_str, to_str);
1424                debug!(from = from_str, to = to_str, "operating mode transition");
1425            }
1426        }
1427
1428        self.append_event(
1429            session_id,
1430            branch_id,
1431            EventKind::StateEstimated {
1432                state: state.clone(),
1433                mode: *mode,
1434            },
1435        )
1436        .await?;
1437        emitted += 1;
1438
1439        let checkpoint_id = if self.should_checkpoint(session_id)? {
1440            let checkpoint = self
1441                .create_checkpoint(session_id, branch_id, manifest, state)
1442                .await?;
1443            self.append_event(
1444                session_id,
1445                branch_id,
1446                EventKind::CheckpointCreated {
1447                    checkpoint_id: checkpoint.checkpoint_id.clone(),
1448                    event_sequence: checkpoint.event_sequence,
1449                    state_hash: checkpoint.state_hash.clone(),
1450                },
1451            )
1452            .await?;
1453            emitted += 1;
1454            Some(checkpoint.checkpoint_id)
1455        } else {
1456            None
1457        };
1458
1459        self.write_heartbeat(session_id, state, mode).await?;
1460        self.append_event(
1461            session_id,
1462            branch_id,
1463            EventKind::Heartbeat {
1464                summary: "tick complete".to_owned(),
1465                checkpoint_id,
1466            },
1467        )
1468        .await?;
1469        emitted += 1;
1470
1471        emitted += self
1472            .emit_phase(session_id, branch_id, LoopPhase::Sleep)
1473            .await?;
1474
1475        self.persist_runtime_state(session_id, state.clone(), *mode)?;
1476
1477        Ok(emitted)
1478    }
1479
1480    async fn record_tool_report(
1481        &self,
1482        session_id: &SessionId,
1483        branch_id: &BranchId,
1484        manifest: &SessionManifest,
1485        report: &ToolExecutionReport,
1486        call_id: Option<String>,
1487    ) -> Result<u64> {
1488        let mut emitted = 0;
1489
1490        self.append_event(
1491            session_id,
1492            branch_id,
1493            EventKind::ToolCallStarted {
1494                tool_run_id: report.tool_run_id.clone(),
1495                tool_name: report.tool_name.clone(),
1496            },
1497        )
1498        .await?;
1499        emitted += 1;
1500
1501        let status = if report.exit_status == 0 {
1502            SpanStatus::Ok
1503        } else {
1504            SpanStatus::Error
1505        };
1506        let result_value = serde_json::to_value(&report.outcome).unwrap_or_default();
1507
1508        self.append_event(
1509            session_id,
1510            branch_id,
1511            EventKind::ToolCallCompleted {
1512                tool_run_id: report.tool_run_id.clone(),
1513                call_id,
1514                tool_name: report.tool_name.clone(),
1515                result: result_value,
1516                duration_ms: 0,
1517                status,
1518            },
1519        )
1520        .await?;
1521        emitted += 1;
1522
1523        if let ToolOutcome::Success { output } = &report.outcome
1524            && let Some(path) = output.get("path").and_then(|v| v.as_str())
1525        {
1526            let full_path =
1527                PathBuf::from(&manifest.workspace_root).join(path.trim_start_matches('/'));
1528            let content_hash = if fs::try_exists(&full_path).await.unwrap_or(false) {
1529                let data = fs::read(&full_path).await?;
1530                sha256_bytes(&data)
1531            } else {
1532                "deleted".to_owned()
1533            };
1534
1535            self.append_event(
1536                session_id,
1537                branch_id,
1538                EventKind::FileMutated {
1539                    path: path.to_owned(),
1540                    content_hash,
1541                },
1542            )
1543            .await?;
1544            emitted += 1;
1545        }
1546
1547        let run_dir = PathBuf::from(&manifest.workspace_root)
1548            .join("tools")
1549            .join("runs")
1550            .join(report.tool_run_id.as_str());
1551
1552        fs::create_dir_all(&run_dir).await?;
1553        self.write_pretty_json(run_dir.join("report.json"), report)
1554            .await?;
1555
1556        let observation = extract_observation(&EventRecord::new(
1557            session_id.clone(),
1558            branch_id.clone(),
1559            self.peek_last_sequence(session_id, branch_id)?,
1560            EventKind::ToolCallCompleted {
1561                tool_run_id: report.tool_run_id.clone(),
1562                call_id: None,
1563                tool_name: report.tool_name.clone(),
1564                result: serde_json::to_value(&report.outcome).unwrap_or_default(),
1565                duration_ms: 0,
1566                status,
1567            },
1568        ));
1569
1570        if let Some(observation) = observation {
1571            self.append_event(
1572                session_id,
1573                branch_id,
1574                EventKind::Custom {
1575                    event_type: "ObservationExtracted".to_owned(),
1576                    data: serde_json::json!({
1577                        "observation_id": observation.observation_id.to_string(),
1578                    }),
1579                },
1580            )
1581            .await?;
1582            emitted += 1;
1583        }
1584
1585        Ok(emitted)
1586    }
1587
1588    async fn evaluate_tool_call_guards(
1589        &self,
1590        ctx: &TurnContext,
1591        call: &ToolCall,
1592    ) -> Result<Option<ToolCallGuardDecision>> {
1593        let mut warning: Option<ToolCallGuardDecision> = None;
1594
1595        for guard in &ctx.tool_call_guards {
1596            match guard.on_tool_call(ctx, call).await? {
1597                ToolCallGuardDecision::Allow => {}
1598                decision @ ToolCallGuardDecision::Warn { .. } => {
1599                    warning = Some(decision);
1600                }
1601                decision @ ToolCallGuardDecision::Block { .. } => {
1602                    return Ok(Some(decision));
1603                }
1604            }
1605        }
1606
1607        Ok(warning)
1608    }
1609
1610    async fn persist_loop_guard_event(
1611        &self,
1612        session_id: &SessionId,
1613        branch_id: &BranchId,
1614        call: &ToolCall,
1615        decision: &ToolCallGuardDecision,
1616    ) -> Result<u64> {
1617        let (event_type, message, repetitions, signature) = match decision {
1618            ToolCallGuardDecision::Warn {
1619                message,
1620                repetitions,
1621                signature,
1622            } => (
1623                "loop_detection.warning",
1624                message.as_str(),
1625                *repetitions as u64,
1626                signature.as_str(),
1627            ),
1628            ToolCallGuardDecision::Block {
1629                message,
1630                repetitions,
1631                signature,
1632            } => (
1633                "loop_detection.hard_stop",
1634                message.as_str(),
1635                *repetitions as u64,
1636                signature.as_str(),
1637            ),
1638            ToolCallGuardDecision::Allow => return Ok(0),
1639        };
1640
1641        self.append_event(
1642            session_id,
1643            branch_id,
1644            EventKind::Custom {
1645                event_type: event_type.to_owned(),
1646                data: serde_json::json!({
1647                    "tool_name": call.tool_name,
1648                    "call_id": call.call_id,
1649                    "signature": signature,
1650                    "message": message,
1651                    "repetitions": repetitions,
1652                }),
1653            },
1654        )
1655        .await?;
1656
1657        Ok(1)
1658    }
1659
1660    async fn emit_guard_message(
1661        &self,
1662        session_id: &SessionId,
1663        branch_id: &BranchId,
1664        decision: &ToolCallGuardDecision,
1665        model: Option<String>,
1666    ) -> Result<u64> {
1667        let (role, content) = match decision {
1668            ToolCallGuardDecision::Warn { message, .. } => ("system".to_owned(), message.clone()),
1669            ToolCallGuardDecision::Block { message, .. } => {
1670                ("assistant".to_owned(), message.clone())
1671            }
1672            ToolCallGuardDecision::Allow => return Ok(0),
1673        };
1674
1675        self.append_event(
1676            session_id,
1677            branch_id,
1678            EventKind::Message {
1679                role,
1680                content,
1681                model,
1682                token_usage: None,
1683            },
1684        )
1685        .await?;
1686
1687        Ok(1)
1688    }
1689
1690    async fn emit_phase(
1691        &self,
1692        session_id: &SessionId,
1693        branch_id: &BranchId,
1694        phase: LoopPhase,
1695    ) -> Result<u64> {
1696        let phase_span = life_vigil::spans::phase_span(phase);
1697        async {
1698            self.append_event(session_id, branch_id, EventKind::PhaseEntered { phase })
1699                .await?;
1700            Ok(1)
1701        }
1702        .instrument(phase_span)
1703        .await
1704    }
1705
1706    async fn append_event(
1707        &self,
1708        session_id: &SessionId,
1709        branch_id: &BranchId,
1710        kind: EventKind,
1711    ) -> Result<()> {
1712        let event_kind = event_kind_name(&kind);
1713        let sequence = self.next_sequence(session_id, branch_id)?;
1714        debug!(
1715            session_id = %session_id,
1716            branch = %branch_id.as_str(),
1717            sequence,
1718            event_kind,
1719            "appending event"
1720        );
1721        let mut event = EventRecord::new(session_id.clone(), branch_id.clone(), sequence, kind);
1722
1723        // Dual-write: embed OTel trace/span IDs into the event for post-hoc correlation.
1724        write_trace_context_on_record(&mut event);
1725
1726        let persisted = match self.event_store.append(event).await {
1727            Ok(persisted) => persisted,
1728            Err(append_error) => {
1729                if let Err(resync_error) = self.resync_next_sequence(session_id, branch_id).await {
1730                    warn!(
1731                        session_id = %session_id,
1732                        branch = %branch_id.as_str(),
1733                        error = %append_error,
1734                        resync_error = %resync_error,
1735                        "event append failed and sequence resync failed"
1736                    );
1737                    return Err(anyhow::anyhow!(append_error.to_string())).context(format!(
1738                        "failed appending event and failed sequence resync: {resync_error}"
1739                    ));
1740                }
1741                warn!(
1742                    session_id = %session_id,
1743                    branch = %branch_id.as_str(),
1744                    error = %append_error,
1745                    "event append failed; sequence resynced"
1746                );
1747                return Err(anyhow::anyhow!(append_error.to_string()))
1748                    .context("failed appending event; sequence was resynced");
1749            }
1750        };
1751        let _ = self.stream.send(persisted.clone());
1752        self.mark_branch_head(session_id, branch_id, persisted.sequence)?;
1753        Ok(())
1754    }
1755
1756    fn next_sequence(&self, session_id: &SessionId, branch_id: &BranchId) -> Result<u64> {
1757        let mut sessions = self.sessions.lock();
1758        let session = sessions
1759            .get_mut(session_id.as_str())
1760            .with_context(|| format!("session not found: {session_id}"))?;
1761        if !session.branches.contains_key(branch_id) {
1762            bail!("branch not found: {}", branch_id.as_str());
1763        }
1764        if let Some(merged_into) = session
1765            .branches
1766            .get(branch_id)
1767            .and_then(|branch| branch.merged_into.as_ref())
1768        {
1769            bail!(
1770                "branch {} is merged into {} and is read-only",
1771                branch_id.as_str(),
1772                merged_into.as_str()
1773            );
1774        }
1775        let sequence = *session
1776            .next_sequence_by_branch
1777            .entry(branch_id.clone())
1778            .or_insert(1);
1779        session
1780            .next_sequence_by_branch
1781            .insert(branch_id.clone(), sequence.saturating_add(1));
1782        Ok(sequence)
1783    }
1784
1785    fn peek_last_sequence(&self, session_id: &SessionId, branch_id: &BranchId) -> Result<u64> {
1786        let sessions = self.sessions.lock();
1787        let session = sessions
1788            .get(session_id.as_str())
1789            .with_context(|| format!("session not found: {session_id}"))?;
1790        if !session.branches.contains_key(branch_id) {
1791            bail!("branch not found: {}", branch_id.as_str());
1792        }
1793        Ok(session
1794            .next_sequence_by_branch
1795            .get(branch_id)
1796            .copied()
1797            .unwrap_or(1)
1798            .saturating_sub(1))
1799    }
1800
1801    async fn resync_next_sequence(
1802        &self,
1803        session_id: &SessionId,
1804        branch_id: &BranchId,
1805    ) -> Result<()> {
1806        let latest = self
1807            .event_store
1808            .head(session_id.clone(), branch_id.clone())
1809            .await
1810            .map_err(|error| anyhow::anyhow!(error.to_string()))
1811            .context("failed loading latest sequence for resync")?;
1812        let mut sessions = self.sessions.lock();
1813        let session = sessions
1814            .get_mut(session_id.as_str())
1815            .with_context(|| format!("session not found: {session_id}"))?;
1816        if !session.branches.contains_key(branch_id) {
1817            bail!("branch not found: {}", branch_id.as_str());
1818        }
1819        session
1820            .next_sequence_by_branch
1821            .insert(branch_id.clone(), latest.saturating_add(1));
1822        Ok(())
1823    }
1824
1825    fn mark_branch_head(
1826        &self,
1827        session_id: &SessionId,
1828        branch_id: &BranchId,
1829        sequence: u64,
1830    ) -> Result<()> {
1831        let mut sessions = self.sessions.lock();
1832        let session = sessions
1833            .get_mut(session_id.as_str())
1834            .with_context(|| format!("session not found: {session_id}"))?;
1835        let branch = session
1836            .branches
1837            .get_mut(branch_id)
1838            .with_context(|| format!("branch not found: {}", branch_id.as_str()))?;
1839        branch.head_sequence = branch.head_sequence.max(sequence);
1840        Ok(())
1841    }
1842
1843    fn branch_info(&self, session_id: &SessionId, branch_id: &BranchId) -> Result<BranchInfo> {
1844        let sessions = self.sessions.lock();
1845        let session = sessions
1846            .get(session_id.as_str())
1847            .with_context(|| format!("session not found: {session_id}"))?;
1848        let state = session
1849            .branches
1850            .get(branch_id)
1851            .with_context(|| format!("branch not found: {}", branch_id.as_str()))?;
1852        Ok(BranchInfo {
1853            branch_id: branch_id.clone(),
1854            parent_branch: state.parent_branch.clone(),
1855            fork_sequence: state.fork_sequence,
1856            head_sequence: state.head_sequence,
1857            merged_into: state.merged_into.clone(),
1858        })
1859    }
1860
1861    fn should_checkpoint(&self, session_id: &SessionId) -> Result<bool> {
1862        let mut sessions = self.sessions.lock();
1863        let session = sessions
1864            .get_mut(session_id.as_str())
1865            .with_context(|| format!("session not found: {session_id}"))?;
1866        session.tick_count += 1;
1867        Ok(session.tick_count % self.config.checkpoint_every_ticks == 0)
1868    }
1869
1870    async fn create_checkpoint(
1871        &self,
1872        session_id: &SessionId,
1873        branch_id: &BranchId,
1874        manifest: &SessionManifest,
1875        state: &AgentStateVector,
1876    ) -> Result<CheckpointManifest> {
1877        let checkpoint_id = CheckpointId::default();
1878        let state_hash = sha256_json(state)?;
1879        let checkpoint = CheckpointManifest {
1880            checkpoint_id: checkpoint_id.clone(),
1881            session_id: session_id.clone(),
1882            branch_id: branch_id.clone(),
1883            created_at: Utc::now(),
1884            event_sequence: self.peek_last_sequence(session_id, branch_id)?,
1885            state_hash,
1886            note: "automatic heartbeat checkpoint".to_owned(),
1887        };
1888
1889        let checkpoint_dir = PathBuf::from(&manifest.workspace_root)
1890            .join("checkpoints")
1891            .join(checkpoint_id.as_str());
1892        fs::create_dir_all(&checkpoint_dir).await?;
1893        self.write_pretty_json(checkpoint_dir.join("manifest.json"), &checkpoint)
1894            .await?;
1895        Ok(checkpoint)
1896    }
1897
1898    async fn write_heartbeat(
1899        &self,
1900        session_id: &SessionId,
1901        state: &AgentStateVector,
1902        mode: &OperatingMode,
1903    ) -> Result<()> {
1904        let workspace_root = {
1905            let sessions = self.sessions.lock();
1906            let session = sessions
1907                .get(session_id.as_str())
1908                .with_context(|| format!("session not found: {session_id}"))?;
1909            session.manifest.workspace_root.clone()
1910        };
1911
1912        let payload = serde_json::json!({
1913            "at": Utc::now(),
1914            "mode": mode,
1915            "state": state,
1916        });
1917        self.write_pretty_json(
1918            PathBuf::from(workspace_root).join("state/heartbeat.json"),
1919            &payload,
1920        )
1921        .await
1922    }
1923
1924    fn persist_runtime_state(
1925        &self,
1926        session_id: &SessionId,
1927        state: AgentStateVector,
1928        mode: OperatingMode,
1929    ) -> Result<()> {
1930        let mut sessions = self.sessions.lock();
1931        let session = sessions
1932            .get_mut(session_id.as_str())
1933            .with_context(|| format!("session not found: {session_id}"))?;
1934        session.state_vector = state.clone();
1935        session.mode = mode;
1936
1937        // Sync lakebase at the workspace root
1938        if let Some(parent) = self.config.root.parent() {
1939            let lake_dir = parent.join(".lake");
1940            let state_json = serde_json::to_string_pretty(&state).unwrap_or_default();
1941            let mode_str = match mode {
1942                OperatingMode::Explore => "explore",
1943                OperatingMode::Execute => "execute",
1944                OperatingMode::Verify => "verify",
1945                OperatingMode::AskHuman => "ask_human",
1946                OperatingMode::Recover => "recover",
1947                OperatingMode::Sleep => "sleep",
1948            };
1949
1950            // Fire and forget IO, but in an async context we need to spawn it or do it blocking
1951            // Since this is a synchronous function, we can't await. Let's spawn it.
1952            let lake_dir_clone = lake_dir.clone();
1953            tokio::spawn(async move {
1954                let _ = fs::create_dir_all(&lake_dir_clone).await;
1955                let _ = fs::write(lake_dir_clone.join("state.json"), state_json).await;
1956                let _ = fs::write(lake_dir_clone.join("mode.txt"), mode_str).await;
1957            });
1958        }
1959
1960        Ok(())
1961    }
1962
1963    async fn current_tick_output(
1964        &self,
1965        session_id: &SessionId,
1966        branch_id: &BranchId,
1967        mode: OperatingMode,
1968        state: AgentStateVector,
1969        events_emitted: u64,
1970    ) -> Result<TickOutput> {
1971        Ok(TickOutput {
1972            session_id: session_id.clone(),
1973            mode,
1974            state,
1975            events_emitted,
1976            last_sequence: self.peek_last_sequence(session_id, branch_id)?,
1977        })
1978    }
1979
1980    async fn initialize_workspace(&self, root: &Path) -> Result<()> {
1981        let directories = [
1982            "events",
1983            "checkpoints",
1984            "state",
1985            "tools/runs",
1986            "artifacts/build",
1987            "artifacts/reports",
1988            "memory",
1989            "inbox/human_requests",
1990            "outbox/ui_stream",
1991        ];
1992
1993        for directory in directories {
1994            fs::create_dir_all(root.join(directory)).await?;
1995        }
1996
1997        let thread_path = root.join("state/thread.md");
1998        if !fs::try_exists(&thread_path).await.unwrap_or(false) {
1999            fs::write(&thread_path, "# Session Thread\n\n- Session created\n").await?;
2000        }
2001
2002        let plan_path = root.join("state/plan.yaml");
2003        if !fs::try_exists(&plan_path).await.unwrap_or(false) {
2004            fs::write(
2005                &plan_path,
2006                "version: 1\nmode: explore\nsteps:\n  - id: bootstrap\n    status: pending\n",
2007            )
2008            .await?;
2009        }
2010
2011        let task_graph_path = root.join("state/task_graph.json");
2012        if !fs::try_exists(&task_graph_path).await.unwrap_or(false) {
2013            fs::write(
2014                &task_graph_path,
2015                serde_json::to_string_pretty(&serde_json::json!({
2016                    "nodes": [{"id": "bootstrap", "type": "task"}],
2017                    "edges": [],
2018                }))?,
2019            )
2020            .await?;
2021        }
2022
2023        Ok(())
2024    }
2025
2026    fn session_root(&self, session_id: &SessionId) -> PathBuf {
2027        self.config.root.join("sessions").join(session_id.as_str())
2028    }
2029
2030    async fn write_pretty_json<T: Serialize>(&self, path: PathBuf, value: &T) -> Result<()> {
2031        if let Some(parent) = path.parent() {
2032            fs::create_dir_all(parent).await?;
2033        }
2034        let payload = serde_json::to_string_pretty(value)?;
2035        fs::write(path, payload).await?;
2036        Ok(())
2037    }
2038}
2039
2040fn sha256_json<T: Serialize>(value: &T) -> Result<String> {
2041    let payload = serde_json::to_vec(value)?;
2042    Ok(sha256_bytes(&payload))
2043}
2044
2045/// Write the current OTel trace context (trace_id, span_id) into an EventRecord.
2046///
2047/// Enables dual-write: persisted events carry OTel correlation IDs so they
2048/// can be linked back to their distributed traces for post-hoc analysis.
2049fn write_trace_context_on_record(event: &mut EventRecord) {
2050    use opentelemetry::trace::TraceContextExt;
2051    use tracing_opentelemetry::OpenTelemetrySpanExt;
2052
2053    let current_span = tracing::Span::current();
2054    let otel_context = current_span.context();
2055    let span_ref = otel_context.span();
2056    let span_context = span_ref.span_context();
2057
2058    if span_context.is_valid() {
2059        event.trace_id = Some(span_context.trace_id().to_string());
2060        event.span_id = Some(span_context.span_id().to_string());
2061    }
2062}
2063
2064fn operating_mode_str(mode: &OperatingMode) -> &'static str {
2065    match mode {
2066        OperatingMode::Explore => "explore",
2067        OperatingMode::Execute => "execute",
2068        OperatingMode::Verify => "verify",
2069        OperatingMode::AskHuman => "ask_human",
2070        OperatingMode::Recover => "recover",
2071        OperatingMode::Sleep => "sleep",
2072    }
2073}
2074
2075fn model_stop_reason_string(stop_reason: &aios_protocol::ModelStopReason) -> String {
2076    match stop_reason {
2077        aios_protocol::ModelStopReason::Completed => "completed".to_owned(),
2078        aios_protocol::ModelStopReason::ToolCall => "tool_call".to_owned(),
2079        aios_protocol::ModelStopReason::MaxIterations => "max_iterations".to_owned(),
2080        aios_protocol::ModelStopReason::Cancelled => "cancelled".to_owned(),
2081        aios_protocol::ModelStopReason::Error => "error".to_owned(),
2082        aios_protocol::ModelStopReason::Other(reason) => reason.clone(),
2083    }
2084}
2085
2086fn extract_observation(event: &EventRecord) -> Option<aios_protocol::Observation> {
2087    let text = match &event.kind {
2088        EventKind::ToolCallCompleted {
2089            tool_name,
2090            result,
2091            status,
2092            ..
2093        } => format!("tool call completed ({tool_name}): {result} [status={status:?}]"),
2094        EventKind::ErrorRaised { message } => format!("error observed: {message}"),
2095        EventKind::CheckpointCreated { checkpoint_id, .. } => {
2096            format!("checkpoint created: {checkpoint_id}")
2097        }
2098        _ => return None,
2099    };
2100
2101    Some(aios_protocol::Observation {
2102        observation_id: uuid::Uuid::new_v4(),
2103        created_at: event.timestamp,
2104        text,
2105        tags: vec!["auto".to_owned()],
2106        provenance: aios_protocol::Provenance {
2107            event_start: event.sequence,
2108            event_end: event.sequence,
2109            files: vec![FileProvenance {
2110                path: format!(
2111                    "events/{}.jsonl#branch={}",
2112                    event.session_id.as_str(),
2113                    event.branch_id.as_str()
2114                ),
2115                sha256: "pending".to_owned(),
2116            }],
2117        },
2118    })
2119}
2120
2121fn sha256_bytes(bytes: &[u8]) -> String {
2122    let digest = Sha256::digest(bytes);
2123    hex::encode(digest)
2124}
2125
2126fn hash_tool_call_signature(call: &ToolCall) -> String {
2127    let payload = serde_json::json!({
2128        "tool_name": call.tool_name,
2129        "input": normalize_json_value(&call.input),
2130    });
2131    let serialized = serde_json::to_vec(&payload).unwrap_or_default();
2132    let mut hasher = Hasher::new();
2133    hasher.update(&serialized);
2134    hasher.finalize().to_hex().to_string()
2135}
2136
2137fn normalize_json_value(value: &Value) -> Value {
2138    match value {
2139        Value::Object(map) => {
2140            let mut keys = map.keys().cloned().collect::<Vec<_>>();
2141            keys.sort();
2142            let normalized = keys
2143                .into_iter()
2144                .map(|key| {
2145                    let normalized_value = map
2146                        .get(&key)
2147                        .map(normalize_json_value)
2148                        .unwrap_or(Value::Null);
2149                    (key, normalized_value)
2150                })
2151                .collect::<Map<String, Value>>();
2152            Value::Object(normalized)
2153        }
2154        Value::Array(items) => Value::Array(items.iter().map(normalize_json_value).collect()),
2155        _ => value.clone(),
2156    }
2157}
2158
2159fn event_kind_name(kind: &EventKind) -> &'static str {
2160    match kind {
2161        EventKind::SessionCreated { .. } => "session_created",
2162        EventKind::BranchCreated { .. } => "branch_created",
2163        EventKind::BranchMerged { .. } => "branch_merged",
2164        EventKind::PhaseEntered { .. } => "phase_entered",
2165        EventKind::DeliberationProposed { .. } => "deliberation_proposed",
2166        EventKind::ApprovalRequested { .. } => "approval_requested",
2167        EventKind::ApprovalResolved { .. } => "approval_resolved",
2168        EventKind::ToolCallRequested { .. } => "tool_call_requested",
2169        EventKind::ToolCallStarted { .. } => "tool_call_started",
2170        EventKind::ToolCallCompleted { .. } => "tool_call_completed",
2171        EventKind::VoiceSessionStarted { .. } => "voice_session_started",
2172        EventKind::VoiceInputChunk { .. } => "voice_input_chunk",
2173        EventKind::VoiceOutputChunk { .. } => "voice_output_chunk",
2174        EventKind::VoiceSessionStopped { .. } => "voice_session_stopped",
2175        EventKind::VoiceAdapterError { .. } => "voice_adapter_error",
2176        EventKind::FileMutated { .. } => "file_mutated",
2177        EventKind::Heartbeat { .. } => "heartbeat",
2178        EventKind::CheckpointCreated { .. } => "checkpoint_created",
2179        EventKind::StateEstimated { .. } => "state_estimated",
2180        EventKind::BudgetUpdated { .. } => "budget_updated",
2181        EventKind::CircuitBreakerTripped { .. } => "circuit_breaker_tripped",
2182        EventKind::ErrorRaised { .. } => "error_raised",
2183        _ => "custom",
2184    }
2185}
2186
2187#[allow(dead_code)]
2188fn _budget_sanity(budget: &BudgetState) -> Result<()> {
2189    if budget.cost_remaining_usd < 0.0 {
2190        bail!("budget cannot be negative");
2191    }
2192    Ok(())
2193}