1use std::collections::{HashMap, VecDeque};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use aios_protocol::{
6 AgentStateVector, ApprovalDecision, ApprovalId, ApprovalPort, ApprovalRequest, ApprovalTicket,
7 BranchId, BranchInfo, BranchMergeResult, BudgetState, CheckpointId, CheckpointManifest,
8 EventKind, EventRecord, EventStorePort, FileProvenance, LoopPhase, ModelCompletionRequest,
9 ModelDirective, ModelProviderPort, ModelRouting, OperatingMode, PolicyGatePort, PolicySet,
10 RiskLevel, RunId, SessionId, SessionManifest, SpanStatus, ToolCall, ToolExecutionReport,
11 ToolExecutionRequest, ToolHarnessPort, ToolOutcome,
12};
13use anyhow::{Context, Result, bail};
14use async_trait::async_trait;
15use blake3::Hasher;
16use chrono::Utc;
17use parking_lot::Mutex;
18use serde::Serialize;
19use serde_json::{Map, Value};
20use sha2::{Digest, Sha256};
21use tokio::fs;
22use tokio::sync::broadcast;
23use tracing::{Instrument, debug, info, instrument, warn};
24
25#[derive(Debug, Clone)]
26pub struct RuntimeConfig {
27 pub root: PathBuf,
28 pub checkpoint_every_ticks: u64,
29 pub circuit_breaker_errors: u32,
30}
31
32impl RuntimeConfig {
33 pub fn new(root: impl Into<PathBuf>) -> Self {
34 Self {
35 root: root.into(),
36 checkpoint_every_ticks: 1,
37 circuit_breaker_errors: 3,
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
43pub struct TickInput {
44 pub objective: String,
45 pub proposed_tool: Option<ToolCall>,
46 pub system_prompt: Option<String>,
48 pub allowed_tools: Option<Vec<String>>,
50}
51
52#[derive(Debug, Clone)]
53pub struct TickOutput {
54 pub session_id: SessionId,
55 pub mode: OperatingMode,
56 pub state: AgentStateVector,
57 pub events_emitted: u64,
58 pub last_sequence: u64,
59}
60
61#[derive(Clone)]
62pub struct TurnContext {
63 pub session_id: SessionId,
64 pub branch_id: BranchId,
65 pub manifest: SessionManifest,
66 pub input: TickInput,
67 pub state: AgentStateVector,
68 pub pending_approvals: Vec<ApprovalTicket>,
69 pub mode: OperatingMode,
70 pub tool_call_guards: Vec<Arc<dyn ToolCallGuard>>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum ToolCallGuardDecision {
75 Allow,
76 Warn {
77 message: String,
78 repetitions: usize,
79 signature: String,
80 },
81 Block {
82 message: String,
83 repetitions: usize,
84 signature: String,
85 },
86}
87
88#[async_trait]
89pub trait ToolCallGuard: Send + Sync {
90 async fn on_tool_call(
91 &self,
92 ctx: &TurnContext,
93 call: &ToolCall,
94 ) -> Result<ToolCallGuardDecision>;
95}
96
97#[async_trait]
98pub trait TurnMiddleware: Send + Sync {
99 async fn process(&self, ctx: &mut TurnContext, next: TurnNext<'_>) -> Result<TickOutput>;
100}
101
102pub struct TurnNext<'a> {
103 runtime: &'a KernelRuntime,
104 middlewares: &'a [Arc<dyn TurnMiddleware>],
105}
106
107impl<'a> TurnNext<'a> {
108 fn new(runtime: &'a KernelRuntime, middlewares: &'a [Arc<dyn TurnMiddleware>]) -> Self {
109 Self {
110 runtime,
111 middlewares,
112 }
113 }
114
115 pub async fn run(self, ctx: &mut TurnContext) -> Result<TickOutput> {
116 match self.middlewares.split_first() {
117 Some((middleware, remaining)) => {
118 middleware
119 .process(ctx, TurnNext::new(self.runtime, remaining))
120 .await
121 }
122 None => self.runtime.execute_turn(ctx).await,
123 }
124 }
125}
126
127#[derive(Debug, Default)]
128pub struct PassthroughTurnMiddleware;
129
130#[async_trait]
131impl TurnMiddleware for PassthroughTurnMiddleware {
132 async fn process(&self, ctx: &mut TurnContext, next: TurnNext<'_>) -> Result<TickOutput> {
133 next.run(ctx).await
134 }
135}
136
137#[derive(Debug, Clone)]
138pub struct LoopDetectionConfig {
139 pub warning_threshold: usize,
140 pub hard_stop_limit: usize,
141 pub window_size: usize,
142}
143
144impl Default for LoopDetectionConfig {
145 fn default() -> Self {
146 Self {
147 warning_threshold: 3,
148 hard_stop_limit: 5,
149 window_size: 20,
150 }
151 }
152}
153
154#[derive(Clone)]
155pub struct LoopDetectionMiddleware {
156 config: LoopDetectionConfig,
157 history_by_session: Arc<Mutex<HashMap<String, VecDeque<String>>>>,
158}
159
160impl LoopDetectionMiddleware {
161 pub fn new(config: LoopDetectionConfig) -> Self {
162 Self {
163 config,
164 history_by_session: Arc::new(Mutex::new(HashMap::new())),
165 }
166 }
167
168 fn record_and_classify(&self, ctx: &TurnContext, call: &ToolCall) -> LoopObservation {
169 let signature = hash_tool_call_signature(call);
170 let session_key = ctx.session_id.as_str().to_owned();
171 let mut history_by_session = self.history_by_session.lock();
172 let history = history_by_session.entry(session_key).or_default();
173
174 let prior_repetitions = history
175 .iter()
176 .rev()
177 .take(self.config.window_size)
178 .take_while(|previous| *previous == &signature)
179 .count();
180 let repetitions = prior_repetitions + 1;
181
182 history.push_back(signature.clone());
183 while history.len() > self.config.window_size {
184 history.pop_front();
185 }
186
187 LoopObservation {
188 signature,
189 repetitions,
190 }
191 }
192}
193
194impl Default for LoopDetectionMiddleware {
195 fn default() -> Self {
196 Self::new(LoopDetectionConfig::default())
197 }
198}
199
200impl std::fmt::Debug for LoopDetectionMiddleware {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.debug_struct("LoopDetectionMiddleware")
203 .field("config", &self.config)
204 .finish()
205 }
206}
207
208#[async_trait]
209impl TurnMiddleware for LoopDetectionMiddleware {
210 async fn process(&self, ctx: &mut TurnContext, next: TurnNext<'_>) -> Result<TickOutput> {
211 ctx.tool_call_guards.push(Arc::new(self.clone()));
212 next.run(ctx).await
213 }
214}
215
216#[async_trait]
217impl ToolCallGuard for LoopDetectionMiddleware {
218 async fn on_tool_call(
219 &self,
220 ctx: &TurnContext,
221 call: &ToolCall,
222 ) -> Result<ToolCallGuardDecision> {
223 let observation = self.record_and_classify(ctx, call);
224
225 if observation.repetitions >= self.config.hard_stop_limit {
226 return Ok(ToolCallGuardDecision::Block {
227 message: format!(
228 "Loop detection stopped repeated tool call `{}` after {} identical turns. Respond with text only and change approach.",
229 call.tool_name, observation.repetitions
230 ),
231 repetitions: observation.repetitions,
232 signature: observation.signature,
233 });
234 }
235
236 if observation.repetitions >= self.config.warning_threshold {
237 return Ok(ToolCallGuardDecision::Warn {
238 message: format!(
239 "Loop detection warning: tool call `{}` has repeated {} times in a row. Stop repeating it and explain the next step in text.",
240 call.tool_name, observation.repetitions
241 ),
242 repetitions: observation.repetitions,
243 signature: observation.signature,
244 });
245 }
246
247 Ok(ToolCallGuardDecision::Allow)
248 }
249}
250
251#[derive(Debug, Clone)]
252struct LoopObservation {
253 signature: String,
254 repetitions: usize,
255}
256
257#[derive(Debug, Clone)]
258struct SessionRuntimeState {
259 manifest: SessionManifest,
260 next_sequence_by_branch: HashMap<BranchId, u64>,
261 branches: HashMap<BranchId, BranchRuntimeState>,
262 tick_count: u64,
263 mode: OperatingMode,
264 state_vector: AgentStateVector,
265}
266
267#[derive(Debug, Clone)]
268struct BranchRuntimeState {
269 parent_branch: Option<BranchId>,
270 fork_sequence: u64,
271 head_sequence: u64,
272 merged_into: Option<BranchId>,
273}
274
275#[derive(Clone)]
276pub struct KernelRuntime {
277 config: RuntimeConfig,
278 event_store: Arc<dyn EventStorePort>,
279 provider: Arc<dyn ModelProviderPort>,
280 tool_harness: Arc<dyn ToolHarnessPort>,
281 approvals: Arc<dyn ApprovalPort>,
282 policy_gate: Arc<dyn PolicyGatePort>,
283 turn_middlewares: Vec<Arc<dyn TurnMiddleware>>,
284 stream: broadcast::Sender<EventRecord>,
285 sessions: Arc<Mutex<HashMap<String, SessionRuntimeState>>>,
286}
287
288impl KernelRuntime {
289 pub fn new(
290 config: RuntimeConfig,
291 event_store: Arc<dyn EventStorePort>,
292 provider: Arc<dyn ModelProviderPort>,
293 tool_harness: Arc<dyn ToolHarnessPort>,
294 approvals: Arc<dyn ApprovalPort>,
295 policy_gate: Arc<dyn PolicyGatePort>,
296 ) -> Self {
297 Self::with_turn_middlewares(
298 config,
299 event_store,
300 provider,
301 tool_harness,
302 approvals,
303 policy_gate,
304 Vec::new(),
305 )
306 }
307
308 pub fn with_turn_middlewares(
309 config: RuntimeConfig,
310 event_store: Arc<dyn EventStorePort>,
311 provider: Arc<dyn ModelProviderPort>,
312 tool_harness: Arc<dyn ToolHarnessPort>,
313 approvals: Arc<dyn ApprovalPort>,
314 policy_gate: Arc<dyn PolicyGatePort>,
315 turn_middlewares: Vec<Arc<dyn TurnMiddleware>>,
316 ) -> Self {
317 let (stream, _) = broadcast::channel(2048);
318 Self {
319 config,
320 event_store,
321 provider,
322 tool_harness,
323 approvals,
324 policy_gate,
325 turn_middlewares,
326 stream,
327 sessions: Arc::new(Mutex::new(HashMap::new())),
328 }
329 }
330
331 #[instrument(skip(self, owner, policy, model_routing))]
332 pub async fn create_session(
333 &self,
334 owner: impl Into<String>,
335 policy: PolicySet,
336 model_routing: ModelRouting,
337 ) -> Result<SessionManifest> {
338 self.create_session_with_id(SessionId::default(), owner, policy, model_routing)
339 .await
340 }
341
342 #[instrument(skip(self, owner, policy, model_routing), fields(session_id = %session_id))]
343 pub async fn create_session_with_id(
344 &self,
345 session_id: SessionId,
346 owner: impl Into<String>,
347 policy: PolicySet,
348 model_routing: ModelRouting,
349 ) -> Result<SessionManifest> {
350 if let Some(existing) = self.sessions.lock().get(session_id.as_str()) {
351 return Ok(existing.manifest.clone());
352 }
353
354 let owner = owner.into();
355 let session_root = self.session_root(&session_id);
356 self.initialize_workspace(session_root.as_path()).await?;
357
358 let manifest = SessionManifest {
359 session_id: session_id.clone(),
360 owner,
361 created_at: Utc::now(),
362 workspace_root: session_root.to_string_lossy().into_owned(),
363 model_routing,
364 policy: serde_json::to_value(&policy).unwrap_or_default(),
365 };
366
367 self.write_pretty_json(session_root.join("manifest.json"), &manifest)
368 .await?;
369
370 let manifest_hash = sha256_json(&manifest)?;
371
372 let main_branch = BranchId::main();
373 let latest_sequence = self
374 .event_store
375 .head(session_id.clone(), main_branch.clone())
376 .await
377 .unwrap_or(0);
378 let mut next_sequence_by_branch = HashMap::new();
379 next_sequence_by_branch.insert(main_branch.clone(), latest_sequence + 1);
380 let mut branches = HashMap::new();
381 branches.insert(
382 main_branch.clone(),
383 BranchRuntimeState {
384 parent_branch: None,
385 fork_sequence: 0,
386 head_sequence: latest_sequence,
387 merged_into: None,
388 },
389 );
390 self.sessions.lock().insert(
391 session_id.as_str().to_owned(),
392 SessionRuntimeState {
393 manifest: manifest.clone(),
394 next_sequence_by_branch,
395 branches,
396 tick_count: 0,
397 mode: OperatingMode::Explore,
398 state_vector: AgentStateVector::default(),
399 },
400 );
401 self.policy_gate
402 .set_policy(session_id.clone(), policy)
403 .await
404 .map_err(|error| anyhow::anyhow!(error.to_string()))?;
405
406 if latest_sequence == 0 {
407 self.append_event(
408 &session_id,
409 &main_branch,
410 EventKind::SessionCreated {
411 name: manifest_hash.clone(),
412 config: serde_json::json!({ "manifest_hash": manifest_hash }),
413 },
414 )
415 .await?;
416
417 self.emit_phase(&session_id, &main_branch, LoopPhase::Sleep)
418 .await?;
419
420 info!(
421 session_id = %session_id,
422 workspace_root = %manifest.workspace_root,
423 "session created"
424 );
425 } else {
426 info!(
427 session_id = %session_id,
428 workspace_root = %manifest.workspace_root,
429 latest_sequence,
430 "session attached to existing event stream"
431 );
432 }
433
434 Ok(manifest)
435 }
436
437 pub fn session_exists(&self, session_id: &SessionId) -> bool {
438 self.sessions.lock().contains_key(session_id.as_str())
439 }
440
441 pub fn list_sessions(&self) -> Vec<SessionManifest> {
443 let sessions = self.sessions.lock();
444 sessions
445 .values()
446 .map(|state| state.manifest.clone())
447 .collect()
448 }
449
450 pub fn root_path(&self) -> &Path {
451 &self.config.root
452 }
453
454 pub async fn tick(&self, session_id: &SessionId, input: TickInput) -> Result<TickOutput> {
455 self.tick_on_branch(session_id, &BranchId::main(), input)
456 .await
457 }
458
459 #[instrument(
460 skip(self, input),
461 fields(
462 session_id = %session_id,
463 branch = %branch_id.as_str(),
464 objective_len = input.objective.len(),
465 has_tool = input.proposed_tool.is_some()
466 )
467 )]
468 pub async fn tick_on_branch(
469 &self,
470 session_id: &SessionId,
471 branch_id: &BranchId,
472 input: TickInput,
473 ) -> Result<TickOutput> {
474 let (manifest, state) = {
475 let sessions = self.sessions.lock();
476 let session = sessions
477 .get(session_id.as_str())
478 .with_context(|| format!("session not found: {session_id}"))?;
479 (session.manifest.clone(), session.state_vector.clone())
480 };
481
482 let pending_approvals = self
483 .approvals
484 .list_pending(session_id.clone())
485 .await
486 .unwrap_or_default();
487 let mode = self.estimate_mode(&state, pending_approvals.len());
488
489 let mut ctx = TurnContext {
490 session_id: session_id.clone(),
491 branch_id: branch_id.clone(),
492 manifest,
493 input,
494 state,
495 pending_approvals,
496 mode,
497 tool_call_guards: Vec::new(),
498 };
499
500 TurnNext::new(self, &self.turn_middlewares)
501 .run(&mut ctx)
502 .await
503 }
504
505 async fn execute_turn(&self, ctx: &mut TurnContext) -> Result<TickOutput> {
506 let session_id = &ctx.session_id;
507 let branch_id = &ctx.branch_id;
508 let manifest = &ctx.manifest;
509 let input = &ctx.input;
510 let guard_context_template = TurnContext {
511 session_id: ctx.session_id.clone(),
512 branch_id: ctx.branch_id.clone(),
513 manifest: ctx.manifest.clone(),
514 input: ctx.input.clone(),
515 state: ctx.state.clone(),
516 pending_approvals: ctx.pending_approvals.clone(),
517 mode: ctx.mode,
518 tool_call_guards: ctx.tool_call_guards.clone(),
519 };
520 let state = &mut ctx.state;
521
522 let mut emitted = 0_u64;
523 let mut previous_mode = Some(ctx.mode);
524 let mut _tool_calls_this_tick = 0_u32;
525 let mut _file_mutations_this_tick = 0_u32;
526 let mut mode = ctx.mode;
527
528 emitted += self
529 .emit_phase(session_id, branch_id, LoopPhase::Perceive)
530 .await?;
531
532 {
534 let metrics = life_vigil::metrics::GenAiMetrics::new("arcan");
535 metrics.record_budget(
536 state.budget.tokens_remaining,
537 state.budget.cost_remaining_usd,
538 );
539 }
540
541 emitted += self
542 .emit_phase(session_id, branch_id, LoopPhase::Deliberate)
543 .await?;
544
545 self.append_event(
546 session_id,
547 branch_id,
548 EventKind::DeliberationProposed {
549 summary: input.objective.clone(),
550 proposed_tool: input.proposed_tool.as_ref().map(|c| c.tool_name.clone()),
551 },
552 )
553 .await?;
554 emitted += 1;
555
556 self.append_event(
557 session_id,
558 branch_id,
559 EventKind::StateEstimated {
560 state: (*state).clone(),
561 mode,
562 },
563 )
564 .await?;
565 emitted += 1;
566 debug!(mode = ?mode, uncertainty = state.uncertainty, "state estimated");
567
568 if matches!(mode, OperatingMode::AskHuman | OperatingMode::Sleep) {
569 emitted += self
570 .finalize_tick(session_id, branch_id, manifest, state, &mode)
571 .await?;
572 ctx.mode = mode;
573 return self
574 .current_tick_output(session_id, branch_id, mode, (*state).clone(), emitted)
575 .await;
576 }
577
578 let run_id = RunId::default();
579 self.append_event(
580 session_id,
581 branch_id,
582 EventKind::RunStarted {
583 provider: "canonical".to_owned(),
584 max_iterations: 1,
585 },
586 )
587 .await?;
588 emitted += 1;
589 self.append_event(session_id, branch_id, EventKind::StepStarted { index: 0 })
590 .await?;
591 emitted += 1;
592
593 let conversation_history = self.build_conversation_history(session_id, branch_id).await;
597
598 let completion = if let Some(call) = input.proposed_tool.clone() {
599 Ok(aios_protocol::ModelCompletion {
600 provider: "inline-proposed-tool".to_owned(),
601 model: "inline".to_owned(),
602 directives: vec![ModelDirective::ToolCall { call }],
603 stop_reason: aios_protocol::ModelStopReason::ToolCall,
604 usage: None,
605 final_answer: None,
606 })
607 } else {
608 self.provider
609 .complete(ModelCompletionRequest {
610 session_id: session_id.clone(),
611 branch_id: branch_id.clone(),
612 run_id: run_id.clone(),
613 step_index: 0,
614 objective: input.objective.clone(),
615 proposed_tool: None,
616 system_prompt: input.system_prompt.clone(),
617 allowed_tools: input.allowed_tools.clone(),
618 conversation_history,
619 })
620 .await
621 .map_err(|error| anyhow::anyhow!(error.to_string()))
622 };
623
624 match completion {
625 Ok(completion) => {
626 let mut directive_count = 0_usize;
627 for directive in completion.directives {
628 directive_count += 1;
629 match directive {
630 ModelDirective::TextDelta { delta, index } => {
631 self.append_event(
632 session_id,
633 branch_id,
634 EventKind::TextDelta { delta, index },
635 )
636 .await?;
637 emitted += 1;
638 }
639 ModelDirective::Message { role, content } => {
640 self.append_event(
641 session_id,
642 branch_id,
643 EventKind::Message {
644 role,
645 content,
646 model: Some(completion.model.clone()),
647 token_usage: completion.usage,
648 },
649 )
650 .await?;
651 emitted += 1;
652 }
653 ModelDirective::ToolCall { call } => {
654 let guard_ctx = TurnContext {
655 session_id: guard_context_template.session_id.clone(),
656 branch_id: guard_context_template.branch_id.clone(),
657 manifest: guard_context_template.manifest.clone(),
658 input: guard_context_template.input.clone(),
659 state: state.clone(),
660 pending_approvals: guard_context_template.pending_approvals.clone(),
661 mode,
662 tool_call_guards: guard_context_template.tool_call_guards.clone(),
663 };
664 if let Some(decision) =
665 self.evaluate_tool_call_guards(&guard_ctx, &call).await?
666 {
667 emitted += self
668 .persist_loop_guard_event(
669 session_id, branch_id, &call, &decision,
670 )
671 .await?;
672 emitted += self
673 .emit_guard_message(
674 session_id,
675 branch_id,
676 &decision,
677 Some(completion.model.clone()),
678 )
679 .await?;
680
681 if matches!(decision, ToolCallGuardDecision::Block { .. }) {
682 continue;
683 }
684 }
685
686 emitted += self
687 .emit_phase(session_id, branch_id, LoopPhase::Gate)
688 .await?;
689 self.append_event(
690 session_id,
691 branch_id,
692 EventKind::ToolCallRequested {
693 call_id: call.call_id.clone(),
694 tool_name: call.tool_name.clone(),
695 arguments: call.input.clone(),
696 category: None,
697 },
698 )
699 .await?;
700 emitted += 1;
701
702 let policy = self
703 .policy_gate
704 .evaluate(session_id.clone(), call.requested_capabilities.clone())
705 .await
706 .map_err(|error| anyhow::anyhow!(error.to_string()))?;
707
708 _tool_calls_this_tick += 1;
710 if !policy.denied.is_empty() {
711 mode = OperatingMode::Recover;
712 state.error_streak += 1;
713 state.uncertainty = (state.uncertainty + 0.15).min(1.0);
714 state.budget.error_budget_remaining =
715 state.budget.error_budget_remaining.saturating_sub(1);
716 self.append_event(
717 session_id,
718 branch_id,
719 EventKind::ToolCallFailed {
720 call_id: call.call_id.clone(),
721 tool_name: call.tool_name.clone(),
722 error: format!(
723 "capabilities denied: {}",
724 policy
725 .denied
726 .iter()
727 .map(|capability| capability.as_str())
728 .collect::<Vec<_>>()
729 .join(",")
730 ),
731 },
732 )
733 .await?;
734 emitted += 1;
735 continue;
736 }
737
738 if !policy.requires_approval.is_empty() {
739 mode = OperatingMode::AskHuman;
740 for capability in policy.requires_approval {
741 let ticket = self
742 .approvals
743 .enqueue(ApprovalRequest {
744 session_id: session_id.clone(),
745 call_id: call.call_id.clone(),
746 tool_name: call.tool_name.clone(),
747 capability: capability.clone(),
748 reason: format!(
749 "approval required for tool {}",
750 call.tool_name
751 ),
752 })
753 .await
754 .map_err(|error| anyhow::anyhow!(error.to_string()))?;
755 self.append_event(
756 session_id,
757 branch_id,
758 EventKind::ApprovalRequested {
759 approval_id: ticket.approval_id,
760 call_id: call.call_id.clone(),
761 tool_name: call.tool_name.clone(),
762 arguments: call.input.clone(),
763 risk: RiskLevel::Medium,
764 },
765 )
766 .await?;
767 emitted += 1;
768 }
769 continue;
770 }
771
772 emitted += self
773 .emit_phase(session_id, branch_id, LoopPhase::Execute)
774 .await?;
775 let report = self
776 .tool_harness
777 .execute(ToolExecutionRequest {
778 session_id: session_id.clone(),
779 workspace_root: manifest.workspace_root.clone(),
780 call: call.clone(),
781 })
782 .await
783 .map_err(|error| anyhow::anyhow!(error.to_string()));
784 match report {
785 Ok(report) => {
786 emitted += self
787 .record_tool_report(
788 session_id,
789 branch_id,
790 manifest,
791 &report,
792 Some(call.call_id.clone()),
793 )
794 .await?;
795 if let ToolOutcome::Success { output } = &report.outcome
796 && output.get("path").is_some()
797 {
798 _file_mutations_this_tick += 1;
799 }
800 self.apply_homeostasis_controllers(state, &report);
801 let new_mode = self.estimate_mode(state, 0);
802 if let Some(prev) = previous_mode
803 && prev != new_mode
804 {
805 self.append_event(
806 session_id,
807 branch_id,
808 EventKind::ModeChanged {
809 from: prev,
810 to: new_mode,
811 reason: format!(
812 "post-tool homeostasis: tool={} exit={}",
813 report.tool_name, report.exit_status
814 ),
815 },
816 )
817 .await?;
818 emitted += 1;
819 }
820 mode = new_mode;
821 previous_mode = Some(mode);
822 info!(
823 tool_name = %report.tool_name,
824 tool_run_id = %report.tool_run_id,
825 exit_status = report.exit_status,
826 mode = ?mode,
827 tool_calls = _tool_calls_this_tick,
828 file_mutations = _file_mutations_this_tick,
829 "tool execution completed"
830 );
831 }
832 Err(error) => {
833 state.error_streak += 1;
834 state.uncertainty = (state.uncertainty + 0.15).min(1.0);
835 state.budget.error_budget_remaining =
836 state.budget.error_budget_remaining.saturating_sub(1);
837 let new_mode = OperatingMode::Recover;
838 if let Some(prev) = previous_mode
839 && prev != new_mode
840 {
841 self.append_event(
842 session_id,
843 branch_id,
844 EventKind::ModeChanged {
845 from: prev,
846 to: new_mode,
847 reason: format!("tool execution error: {error}"),
848 },
849 )
850 .await?;
851 emitted += 1;
852 }
853 mode = new_mode;
854 previous_mode = Some(mode);
855 warn!(
856 error = %error,
857 error_streak = state.error_streak,
858 "tool execution failed"
859 );
860 self.append_event(
861 session_id,
862 branch_id,
863 EventKind::ToolCallFailed {
864 call_id: call.call_id.clone(),
865 tool_name: call.tool_name.clone(),
866 error: error.to_string(),
867 },
868 )
869 .await?;
870 emitted += 1;
871 }
872 }
873 }
874 }
875 }
876
877 emitted += self
878 .emit_phase(session_id, branch_id, LoopPhase::Commit)
879 .await?;
880
881 self.append_event(
882 session_id,
883 branch_id,
884 EventKind::StepFinished {
885 index: 0,
886 stop_reason: model_stop_reason_string(&completion.stop_reason),
887 directive_count,
888 },
889 )
890 .await?;
891 emitted += 1;
892
893 self.append_event(
894 session_id,
895 branch_id,
896 EventKind::RunFinished {
897 reason: model_stop_reason_string(&completion.stop_reason),
898 total_iterations: 1,
899 final_answer: completion.final_answer,
900 usage: completion.usage,
901 },
902 )
903 .await?;
904 emitted += 1;
905 }
906 Err(error) => {
907 mode = OperatingMode::Recover;
908 state.error_streak += 1;
909 state.uncertainty = (state.uncertainty + 0.15).min(1.0);
910 state.budget.error_budget_remaining =
911 state.budget.error_budget_remaining.saturating_sub(1);
912 self.append_event(
913 session_id,
914 branch_id,
915 EventKind::RunErrored {
916 error: error.to_string(),
917 },
918 )
919 .await?;
920 emitted += 1;
921 }
922 }
923
924 if state.error_streak >= self.config.circuit_breaker_errors {
925 mode = OperatingMode::Recover;
926 warn!(
927 error_streak = state.error_streak,
928 threshold = self.config.circuit_breaker_errors,
929 "circuit breaker tripped"
930 );
931 self.append_event(
932 session_id,
933 branch_id,
934 EventKind::CircuitBreakerTripped {
935 reason: "error streak exceeded threshold".to_owned(),
936 error_streak: state.error_streak,
937 },
938 )
939 .await?;
940 emitted += 1;
941 }
942
943 emitted += self
944 .finalize_tick(session_id, branch_id, manifest, state, &mode)
945 .await?;
946 ctx.mode = mode;
947 info!(mode = ?mode, emitted, "tick finalized");
948 self.current_tick_output(session_id, branch_id, mode, (*state).clone(), emitted)
949 .await
950 }
951
952 #[instrument(
953 skip(self),
954 fields(
955 session_id = %session_id,
956 branch = %branch_id.as_str(),
957 from_branch = ?from_branch.as_ref().map(|branch| branch.as_str())
958 )
959 )]
960 pub async fn create_branch(
961 &self,
962 session_id: &SessionId,
963 branch_id: BranchId,
964 from_branch: Option<BranchId>,
965 fork_sequence: Option<u64>,
966 ) -> Result<BranchInfo> {
967 let from_branch = from_branch.unwrap_or_else(BranchId::main);
968 let fork_sequence_value = {
969 let mut sessions = self.sessions.lock();
970 let session = sessions
971 .get_mut(session_id.as_str())
972 .with_context(|| format!("session not found: {session_id}"))?;
973 if session.branches.contains_key(&branch_id) {
974 bail!("branch already exists: {}", branch_id.as_str());
975 }
976 let parent = session
977 .branches
978 .get(&from_branch)
979 .with_context(|| format!("source branch not found: {}", from_branch.as_str()))?;
980 if let Some(target) = &parent.merged_into {
981 bail!(
982 "source branch {} is merged into {} and is read-only",
983 from_branch.as_str(),
984 target.as_str()
985 );
986 }
987 let fork = fork_sequence.unwrap_or(parent.head_sequence);
988 if fork > parent.head_sequence {
989 bail!(
990 "fork sequence {} exceeds source branch head {}",
991 fork,
992 parent.head_sequence
993 );
994 }
995
996 session.next_sequence_by_branch.insert(branch_id.clone(), 1);
997 session.branches.insert(
998 branch_id.clone(),
999 BranchRuntimeState {
1000 parent_branch: Some(from_branch.clone()),
1001 fork_sequence: fork,
1002 head_sequence: 0,
1003 merged_into: None,
1004 },
1005 );
1006 fork
1007 };
1008
1009 self.append_event(
1010 session_id,
1011 &branch_id,
1012 EventKind::BranchCreated {
1013 new_branch_id: branch_id.clone(),
1014 fork_point_seq: fork_sequence_value,
1015 name: branch_id.as_str().to_owned(),
1016 },
1017 )
1018 .await?;
1019 info!(
1020 branch = %branch_id.as_str(),
1021 from_branch = %from_branch.as_str(),
1022 fork_sequence = fork_sequence_value,
1023 "branch created"
1024 );
1025
1026 self.branch_info(session_id, &branch_id)
1027 }
1028
1029 pub async fn list_branches(&self, session_id: &SessionId) -> Result<Vec<BranchInfo>> {
1030 let sessions = self.sessions.lock();
1031 let session = sessions
1032 .get(session_id.as_str())
1033 .with_context(|| format!("session not found: {session_id}"))?;
1034
1035 let mut branches: Vec<_> = session
1036 .branches
1037 .iter()
1038 .map(|(branch_id, state)| BranchInfo {
1039 branch_id: branch_id.clone(),
1040 parent_branch: state.parent_branch.clone(),
1041 fork_sequence: state.fork_sequence,
1042 head_sequence: state.head_sequence,
1043 merged_into: state.merged_into.clone(),
1044 })
1045 .collect();
1046 branches.sort_by(|a, b| a.branch_id.as_str().cmp(b.branch_id.as_str()));
1047 Ok(branches)
1048 }
1049
1050 #[instrument(
1051 skip(self),
1052 fields(
1053 session_id = %session_id,
1054 source_branch = %source_branch.as_str(),
1055 target_branch = %target_branch.as_str()
1056 )
1057 )]
1058 pub async fn merge_branch(
1059 &self,
1060 session_id: &SessionId,
1061 source_branch: BranchId,
1062 target_branch: BranchId,
1063 ) -> Result<BranchMergeResult> {
1064 if source_branch == target_branch {
1065 bail!("source and target branch must differ");
1066 }
1067 if source_branch == BranchId::main() {
1068 bail!("main branch cannot be used as a merge source");
1069 }
1070
1071 let source_head =
1072 {
1073 let sessions = self.sessions.lock();
1074 let session = sessions
1075 .get(session_id.as_str())
1076 .with_context(|| format!("session not found: {session_id}"))?;
1077 let source = session.branches.get(&source_branch).with_context(|| {
1078 format!("source branch not found: {}", source_branch.as_str())
1079 })?;
1080 if let Some(merged_into) = &source.merged_into {
1081 bail!(
1082 "source branch {} already merged into {}",
1083 source_branch.as_str(),
1084 merged_into.as_str()
1085 );
1086 }
1087 let target = session.branches.get(&target_branch).with_context(|| {
1088 format!("target branch not found: {}", target_branch.as_str())
1089 })?;
1090 if let Some(merged_into) = &target.merged_into {
1091 bail!(
1092 "target branch {} is merged into {} and is read-only",
1093 target_branch.as_str(),
1094 merged_into.as_str()
1095 );
1096 }
1097 source.head_sequence
1098 };
1099
1100 self.append_event(
1101 session_id,
1102 &target_branch,
1103 EventKind::BranchMerged {
1104 source_branch_id: source_branch.clone(),
1105 merge_seq: source_head,
1106 },
1107 )
1108 .await?;
1109
1110 let target_head = self.peek_last_sequence(session_id, &target_branch)?;
1111 {
1112 let mut sessions = self.sessions.lock();
1113 let session = sessions
1114 .get_mut(session_id.as_str())
1115 .with_context(|| format!("session not found: {session_id}"))?;
1116 let source = session
1117 .branches
1118 .get_mut(&source_branch)
1119 .with_context(|| format!("source branch not found: {}", source_branch.as_str()))?;
1120 source.merged_into = Some(target_branch.clone());
1121 }
1122 info!(
1123 source_head_sequence = source_head,
1124 target_head_sequence = target_head,
1125 "branch merged"
1126 );
1127
1128 Ok(BranchMergeResult {
1129 source_branch,
1130 target_branch,
1131 source_head_sequence: source_head,
1132 target_head_sequence: target_head,
1133 })
1134 }
1135
1136 pub async fn resolve_approval(
1137 &self,
1138 session_id: &SessionId,
1139 approval_id: uuid::Uuid,
1140 approved: bool,
1141 actor: impl Into<String>,
1142 ) -> Result<()> {
1143 let actor = actor.into();
1144 let resolution = self
1145 .approvals
1146 .resolve(
1147 ApprovalId::from_string(approval_id.to_string()),
1148 approved,
1149 actor.clone(),
1150 )
1151 .await
1152 .map_err(|error| anyhow::anyhow!(error.to_string()))
1153 .with_context(|| format!("approval not pending: {approval_id}"))?;
1154
1155 let decision = if resolution.approved {
1156 ApprovalDecision::Approved
1157 } else {
1158 ApprovalDecision::Denied
1159 };
1160
1161 self.append_event(
1162 session_id,
1163 &BranchId::main(),
1164 EventKind::ApprovalResolved {
1165 approval_id: ApprovalId::from_string(approval_id.to_string()),
1166 decision,
1167 reason: Some(actor),
1168 },
1169 )
1170 .await?;
1171 Ok(())
1172 }
1173
1174 pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<EventRecord> {
1175 self.stream.subscribe()
1176 }
1177
1178 pub fn event_sender(&self) -> broadcast::Sender<EventRecord> {
1181 self.stream.clone()
1182 }
1183
1184 pub async fn record_external_event(
1185 &self,
1186 session_id: &SessionId,
1187 kind: EventKind,
1188 ) -> Result<()> {
1189 self.record_external_event_on_branch(session_id, &BranchId::main(), kind)
1190 .await
1191 }
1192
1193 #[instrument(
1194 skip(self, kind),
1195 fields(session_id = %session_id, branch = %branch_id.as_str())
1196 )]
1197 pub async fn record_external_event_on_branch(
1198 &self,
1199 session_id: &SessionId,
1200 branch_id: &BranchId,
1201 kind: EventKind,
1202 ) -> Result<()> {
1203 {
1204 let sessions = self.sessions.lock();
1205 if !sessions.contains_key(session_id.as_str()) {
1206 bail!("session not found: {session_id}");
1207 }
1208 }
1209 self.append_event(session_id, branch_id, kind).await
1210 }
1211
1212 pub async fn read_events(
1213 &self,
1214 session_id: &SessionId,
1215 from_sequence: u64,
1216 limit: usize,
1217 ) -> Result<Vec<EventRecord>> {
1218 self.read_events_on_branch(session_id, &BranchId::main(), from_sequence, limit)
1219 .await
1220 }
1221
1222 pub async fn read_events_on_branch(
1223 &self,
1224 session_id: &SessionId,
1225 branch_id: &BranchId,
1226 from_sequence: u64,
1227 limit: usize,
1228 ) -> Result<Vec<EventRecord>> {
1229 self.event_store
1230 .read(session_id.clone(), branch_id.clone(), from_sequence, limit)
1231 .await
1232 .map_err(|error| anyhow::anyhow!(error.to_string()))
1233 }
1234
1235 async fn build_conversation_history(
1243 &self,
1244 session_id: &SessionId,
1245 branch_id: &BranchId,
1246 ) -> Vec<aios_protocol::ConversationTurn> {
1247 let events = match self
1248 .event_store
1249 .read(session_id.clone(), branch_id.clone(), 0, 10_000)
1250 .await
1251 {
1252 Ok(events) => events,
1253 Err(err) => {
1254 debug!(%err, "failed to read events for conversation history");
1255 return Vec::new();
1256 }
1257 };
1258
1259 let mut turns = Vec::new();
1260 let mut current_assistant_text = String::new();
1261
1262 for record in &events {
1263 match &record.kind {
1264 EventKind::DeliberationProposed { summary, .. } => {
1265 if !current_assistant_text.is_empty() {
1267 turns.push(aios_protocol::ConversationTurn {
1268 role: "assistant".to_owned(),
1269 content: std::mem::take(&mut current_assistant_text),
1270 });
1271 }
1272 if !summary.is_empty() {
1273 turns.push(aios_protocol::ConversationTurn {
1274 role: "user".to_owned(),
1275 content: summary.clone(),
1276 });
1277 }
1278 }
1279 EventKind::Message { role, content, .. } if role == "assistant" => {
1280 current_assistant_text.push_str(content);
1281 }
1282 EventKind::TextDelta { delta, .. } => {
1283 current_assistant_text.push_str(delta);
1284 }
1285 EventKind::RunFinished { final_answer, .. } => {
1286 if current_assistant_text.is_empty()
1288 && let Some(answer) = final_answer
1289 {
1290 current_assistant_text = answer.clone();
1291 }
1292 if !current_assistant_text.is_empty() {
1294 turns.push(aios_protocol::ConversationTurn {
1295 role: "assistant".to_owned(),
1296 content: std::mem::take(&mut current_assistant_text),
1297 });
1298 }
1299 }
1300 _ => {}
1301 }
1302 }
1303
1304 if !current_assistant_text.is_empty() {
1306 turns.push(aios_protocol::ConversationTurn {
1307 role: "assistant".to_owned(),
1308 content: current_assistant_text,
1309 });
1310 }
1311
1312 let max_turns = 50;
1314 if turns.len() > max_turns {
1315 turns.drain(..turns.len() - max_turns);
1316 }
1317
1318 turns
1319 }
1320
1321 fn estimate_mode(&self, state: &AgentStateVector, pending_approvals: usize) -> OperatingMode {
1322 if pending_approvals > 0 {
1323 return OperatingMode::AskHuman;
1324 }
1325
1326 if state.error_streak >= self.config.circuit_breaker_errors {
1327 return OperatingMode::Recover;
1328 }
1329
1330 if state.progress >= 0.98 {
1331 return OperatingMode::Sleep;
1332 }
1333
1334 if state.context_pressure > 0.8 || state.uncertainty > 0.65 {
1335 return OperatingMode::Explore;
1336 }
1337
1338 if state.side_effect_pressure > 0.6 {
1339 return OperatingMode::Verify;
1340 }
1341
1342 OperatingMode::Execute
1343 }
1344
1345 fn apply_homeostasis_controllers(
1346 &self,
1347 state: &mut AgentStateVector,
1348 report: &ToolExecutionReport,
1349 ) {
1350 state.budget.tool_calls_remaining = state.budget.tool_calls_remaining.saturating_sub(1);
1351 state.budget.tokens_remaining = state.budget.tokens_remaining.saturating_sub(750);
1352 state.budget.time_remaining_ms = state.budget.time_remaining_ms.saturating_sub(1200);
1353
1354 if report.exit_status == 0 {
1355 state.progress = (state.progress + 0.12).min(1.0);
1356 state.uncertainty = (state.uncertainty * 0.85).max(0.05);
1357 state.error_streak = 0;
1358 state.side_effect_pressure = (state.side_effect_pressure + 0.2).min(1.0);
1359 } else {
1360 state.error_streak += 1;
1361 state.uncertainty = (state.uncertainty + 0.18).min(1.0);
1362 state.budget.error_budget_remaining =
1363 state.budget.error_budget_remaining.saturating_sub(1);
1364 state.side_effect_pressure = (state.side_effect_pressure * 0.5).max(0.1);
1365 }
1366
1367 state.context_pressure = (state.context_pressure + 0.03).min(1.0);
1368 state.human_dependency = if state.error_streak >= 2 { 0.6 } else { 0.0 };
1369
1370 state.risk_level = if state.uncertainty > 0.75 || state.side_effect_pressure > 0.7 {
1371 RiskLevel::High
1372 } else if state.uncertainty > 0.45 || state.side_effect_pressure > 0.4 {
1373 RiskLevel::Medium
1374 } else {
1375 RiskLevel::Low
1376 };
1377 }
1378
1379 async fn finalize_tick(
1380 &self,
1381 session_id: &SessionId,
1382 branch_id: &BranchId,
1383 manifest: &SessionManifest,
1384 state: &mut AgentStateVector,
1385 mode: &OperatingMode,
1386 ) -> Result<u64> {
1387 let mut emitted = 0_u64;
1388
1389 emitted += self
1390 .emit_phase(session_id, branch_id, LoopPhase::Reflect)
1391 .await?;
1392
1393 self.append_event(
1394 session_id,
1395 branch_id,
1396 EventKind::BudgetUpdated {
1397 budget: state.budget.clone(),
1398 reason: "tick accounting".to_owned(),
1399 },
1400 )
1401 .await?;
1402 emitted += 1;
1403
1404 {
1406 let metrics = life_vigil::metrics::GenAiMetrics::new("arcan");
1407 metrics.record_budget(
1408 state.budget.tokens_remaining,
1409 state.budget.cost_remaining_usd,
1410 );
1411
1412 let previous_mode = {
1414 let sessions = self.sessions.lock();
1415 sessions
1416 .get(session_id.as_str())
1417 .map(|s| s.mode)
1418 .unwrap_or(OperatingMode::Explore)
1419 };
1420 if previous_mode != *mode {
1421 let from_str = operating_mode_str(&previous_mode);
1422 let to_str = operating_mode_str(mode);
1423 metrics.record_mode_transition(from_str, to_str);
1424 debug!(from = from_str, to = to_str, "operating mode transition");
1425 }
1426 }
1427
1428 self.append_event(
1429 session_id,
1430 branch_id,
1431 EventKind::StateEstimated {
1432 state: state.clone(),
1433 mode: *mode,
1434 },
1435 )
1436 .await?;
1437 emitted += 1;
1438
1439 let checkpoint_id = if self.should_checkpoint(session_id)? {
1440 let checkpoint = self
1441 .create_checkpoint(session_id, branch_id, manifest, state)
1442 .await?;
1443 self.append_event(
1444 session_id,
1445 branch_id,
1446 EventKind::CheckpointCreated {
1447 checkpoint_id: checkpoint.checkpoint_id.clone(),
1448 event_sequence: checkpoint.event_sequence,
1449 state_hash: checkpoint.state_hash.clone(),
1450 },
1451 )
1452 .await?;
1453 emitted += 1;
1454 Some(checkpoint.checkpoint_id)
1455 } else {
1456 None
1457 };
1458
1459 self.write_heartbeat(session_id, state, mode).await?;
1460 self.append_event(
1461 session_id,
1462 branch_id,
1463 EventKind::Heartbeat {
1464 summary: "tick complete".to_owned(),
1465 checkpoint_id,
1466 },
1467 )
1468 .await?;
1469 emitted += 1;
1470
1471 emitted += self
1472 .emit_phase(session_id, branch_id, LoopPhase::Sleep)
1473 .await?;
1474
1475 self.persist_runtime_state(session_id, state.clone(), *mode)?;
1476
1477 Ok(emitted)
1478 }
1479
1480 async fn record_tool_report(
1481 &self,
1482 session_id: &SessionId,
1483 branch_id: &BranchId,
1484 manifest: &SessionManifest,
1485 report: &ToolExecutionReport,
1486 call_id: Option<String>,
1487 ) -> Result<u64> {
1488 let mut emitted = 0;
1489
1490 self.append_event(
1491 session_id,
1492 branch_id,
1493 EventKind::ToolCallStarted {
1494 tool_run_id: report.tool_run_id.clone(),
1495 tool_name: report.tool_name.clone(),
1496 },
1497 )
1498 .await?;
1499 emitted += 1;
1500
1501 let status = if report.exit_status == 0 {
1502 SpanStatus::Ok
1503 } else {
1504 SpanStatus::Error
1505 };
1506 let result_value = serde_json::to_value(&report.outcome).unwrap_or_default();
1507
1508 self.append_event(
1509 session_id,
1510 branch_id,
1511 EventKind::ToolCallCompleted {
1512 tool_run_id: report.tool_run_id.clone(),
1513 call_id,
1514 tool_name: report.tool_name.clone(),
1515 result: result_value,
1516 duration_ms: 0,
1517 status,
1518 },
1519 )
1520 .await?;
1521 emitted += 1;
1522
1523 if let ToolOutcome::Success { output } = &report.outcome
1524 && let Some(path) = output.get("path").and_then(|v| v.as_str())
1525 {
1526 let full_path =
1527 PathBuf::from(&manifest.workspace_root).join(path.trim_start_matches('/'));
1528 let content_hash = if fs::try_exists(&full_path).await.unwrap_or(false) {
1529 let data = fs::read(&full_path).await?;
1530 sha256_bytes(&data)
1531 } else {
1532 "deleted".to_owned()
1533 };
1534
1535 self.append_event(
1536 session_id,
1537 branch_id,
1538 EventKind::FileMutated {
1539 path: path.to_owned(),
1540 content_hash,
1541 },
1542 )
1543 .await?;
1544 emitted += 1;
1545 }
1546
1547 let run_dir = PathBuf::from(&manifest.workspace_root)
1548 .join("tools")
1549 .join("runs")
1550 .join(report.tool_run_id.as_str());
1551
1552 fs::create_dir_all(&run_dir).await?;
1553 self.write_pretty_json(run_dir.join("report.json"), report)
1554 .await?;
1555
1556 let observation = extract_observation(&EventRecord::new(
1557 session_id.clone(),
1558 branch_id.clone(),
1559 self.peek_last_sequence(session_id, branch_id)?,
1560 EventKind::ToolCallCompleted {
1561 tool_run_id: report.tool_run_id.clone(),
1562 call_id: None,
1563 tool_name: report.tool_name.clone(),
1564 result: serde_json::to_value(&report.outcome).unwrap_or_default(),
1565 duration_ms: 0,
1566 status,
1567 },
1568 ));
1569
1570 if let Some(observation) = observation {
1571 self.append_event(
1572 session_id,
1573 branch_id,
1574 EventKind::Custom {
1575 event_type: "ObservationExtracted".to_owned(),
1576 data: serde_json::json!({
1577 "observation_id": observation.observation_id.to_string(),
1578 }),
1579 },
1580 )
1581 .await?;
1582 emitted += 1;
1583 }
1584
1585 Ok(emitted)
1586 }
1587
1588 async fn evaluate_tool_call_guards(
1589 &self,
1590 ctx: &TurnContext,
1591 call: &ToolCall,
1592 ) -> Result<Option<ToolCallGuardDecision>> {
1593 let mut warning: Option<ToolCallGuardDecision> = None;
1594
1595 for guard in &ctx.tool_call_guards {
1596 match guard.on_tool_call(ctx, call).await? {
1597 ToolCallGuardDecision::Allow => {}
1598 decision @ ToolCallGuardDecision::Warn { .. } => {
1599 warning = Some(decision);
1600 }
1601 decision @ ToolCallGuardDecision::Block { .. } => {
1602 return Ok(Some(decision));
1603 }
1604 }
1605 }
1606
1607 Ok(warning)
1608 }
1609
1610 async fn persist_loop_guard_event(
1611 &self,
1612 session_id: &SessionId,
1613 branch_id: &BranchId,
1614 call: &ToolCall,
1615 decision: &ToolCallGuardDecision,
1616 ) -> Result<u64> {
1617 let (event_type, message, repetitions, signature) = match decision {
1618 ToolCallGuardDecision::Warn {
1619 message,
1620 repetitions,
1621 signature,
1622 } => (
1623 "loop_detection.warning",
1624 message.as_str(),
1625 *repetitions as u64,
1626 signature.as_str(),
1627 ),
1628 ToolCallGuardDecision::Block {
1629 message,
1630 repetitions,
1631 signature,
1632 } => (
1633 "loop_detection.hard_stop",
1634 message.as_str(),
1635 *repetitions as u64,
1636 signature.as_str(),
1637 ),
1638 ToolCallGuardDecision::Allow => return Ok(0),
1639 };
1640
1641 self.append_event(
1642 session_id,
1643 branch_id,
1644 EventKind::Custom {
1645 event_type: event_type.to_owned(),
1646 data: serde_json::json!({
1647 "tool_name": call.tool_name,
1648 "call_id": call.call_id,
1649 "signature": signature,
1650 "message": message,
1651 "repetitions": repetitions,
1652 }),
1653 },
1654 )
1655 .await?;
1656
1657 Ok(1)
1658 }
1659
1660 async fn emit_guard_message(
1661 &self,
1662 session_id: &SessionId,
1663 branch_id: &BranchId,
1664 decision: &ToolCallGuardDecision,
1665 model: Option<String>,
1666 ) -> Result<u64> {
1667 let (role, content) = match decision {
1668 ToolCallGuardDecision::Warn { message, .. } => ("system".to_owned(), message.clone()),
1669 ToolCallGuardDecision::Block { message, .. } => {
1670 ("assistant".to_owned(), message.clone())
1671 }
1672 ToolCallGuardDecision::Allow => return Ok(0),
1673 };
1674
1675 self.append_event(
1676 session_id,
1677 branch_id,
1678 EventKind::Message {
1679 role,
1680 content,
1681 model,
1682 token_usage: None,
1683 },
1684 )
1685 .await?;
1686
1687 Ok(1)
1688 }
1689
1690 async fn emit_phase(
1691 &self,
1692 session_id: &SessionId,
1693 branch_id: &BranchId,
1694 phase: LoopPhase,
1695 ) -> Result<u64> {
1696 let phase_span = life_vigil::spans::phase_span(phase);
1697 async {
1698 self.append_event(session_id, branch_id, EventKind::PhaseEntered { phase })
1699 .await?;
1700 Ok(1)
1701 }
1702 .instrument(phase_span)
1703 .await
1704 }
1705
1706 async fn append_event(
1707 &self,
1708 session_id: &SessionId,
1709 branch_id: &BranchId,
1710 kind: EventKind,
1711 ) -> Result<()> {
1712 let event_kind = event_kind_name(&kind);
1713 let sequence = self.next_sequence(session_id, branch_id)?;
1714 debug!(
1715 session_id = %session_id,
1716 branch = %branch_id.as_str(),
1717 sequence,
1718 event_kind,
1719 "appending event"
1720 );
1721 let mut event = EventRecord::new(session_id.clone(), branch_id.clone(), sequence, kind);
1722
1723 write_trace_context_on_record(&mut event);
1725
1726 let persisted = match self.event_store.append(event).await {
1727 Ok(persisted) => persisted,
1728 Err(append_error) => {
1729 if let Err(resync_error) = self.resync_next_sequence(session_id, branch_id).await {
1730 warn!(
1731 session_id = %session_id,
1732 branch = %branch_id.as_str(),
1733 error = %append_error,
1734 resync_error = %resync_error,
1735 "event append failed and sequence resync failed"
1736 );
1737 return Err(anyhow::anyhow!(append_error.to_string())).context(format!(
1738 "failed appending event and failed sequence resync: {resync_error}"
1739 ));
1740 }
1741 warn!(
1742 session_id = %session_id,
1743 branch = %branch_id.as_str(),
1744 error = %append_error,
1745 "event append failed; sequence resynced"
1746 );
1747 return Err(anyhow::anyhow!(append_error.to_string()))
1748 .context("failed appending event; sequence was resynced");
1749 }
1750 };
1751 let _ = self.stream.send(persisted.clone());
1752 self.mark_branch_head(session_id, branch_id, persisted.sequence)?;
1753 Ok(())
1754 }
1755
1756 fn next_sequence(&self, session_id: &SessionId, branch_id: &BranchId) -> Result<u64> {
1757 let mut sessions = self.sessions.lock();
1758 let session = sessions
1759 .get_mut(session_id.as_str())
1760 .with_context(|| format!("session not found: {session_id}"))?;
1761 if !session.branches.contains_key(branch_id) {
1762 bail!("branch not found: {}", branch_id.as_str());
1763 }
1764 if let Some(merged_into) = session
1765 .branches
1766 .get(branch_id)
1767 .and_then(|branch| branch.merged_into.as_ref())
1768 {
1769 bail!(
1770 "branch {} is merged into {} and is read-only",
1771 branch_id.as_str(),
1772 merged_into.as_str()
1773 );
1774 }
1775 let sequence = *session
1776 .next_sequence_by_branch
1777 .entry(branch_id.clone())
1778 .or_insert(1);
1779 session
1780 .next_sequence_by_branch
1781 .insert(branch_id.clone(), sequence.saturating_add(1));
1782 Ok(sequence)
1783 }
1784
1785 fn peek_last_sequence(&self, session_id: &SessionId, branch_id: &BranchId) -> Result<u64> {
1786 let sessions = self.sessions.lock();
1787 let session = sessions
1788 .get(session_id.as_str())
1789 .with_context(|| format!("session not found: {session_id}"))?;
1790 if !session.branches.contains_key(branch_id) {
1791 bail!("branch not found: {}", branch_id.as_str());
1792 }
1793 Ok(session
1794 .next_sequence_by_branch
1795 .get(branch_id)
1796 .copied()
1797 .unwrap_or(1)
1798 .saturating_sub(1))
1799 }
1800
1801 async fn resync_next_sequence(
1802 &self,
1803 session_id: &SessionId,
1804 branch_id: &BranchId,
1805 ) -> Result<()> {
1806 let latest = self
1807 .event_store
1808 .head(session_id.clone(), branch_id.clone())
1809 .await
1810 .map_err(|error| anyhow::anyhow!(error.to_string()))
1811 .context("failed loading latest sequence for resync")?;
1812 let mut sessions = self.sessions.lock();
1813 let session = sessions
1814 .get_mut(session_id.as_str())
1815 .with_context(|| format!("session not found: {session_id}"))?;
1816 if !session.branches.contains_key(branch_id) {
1817 bail!("branch not found: {}", branch_id.as_str());
1818 }
1819 session
1820 .next_sequence_by_branch
1821 .insert(branch_id.clone(), latest.saturating_add(1));
1822 Ok(())
1823 }
1824
1825 fn mark_branch_head(
1826 &self,
1827 session_id: &SessionId,
1828 branch_id: &BranchId,
1829 sequence: u64,
1830 ) -> Result<()> {
1831 let mut sessions = self.sessions.lock();
1832 let session = sessions
1833 .get_mut(session_id.as_str())
1834 .with_context(|| format!("session not found: {session_id}"))?;
1835 let branch = session
1836 .branches
1837 .get_mut(branch_id)
1838 .with_context(|| format!("branch not found: {}", branch_id.as_str()))?;
1839 branch.head_sequence = branch.head_sequence.max(sequence);
1840 Ok(())
1841 }
1842
1843 fn branch_info(&self, session_id: &SessionId, branch_id: &BranchId) -> Result<BranchInfo> {
1844 let sessions = self.sessions.lock();
1845 let session = sessions
1846 .get(session_id.as_str())
1847 .with_context(|| format!("session not found: {session_id}"))?;
1848 let state = session
1849 .branches
1850 .get(branch_id)
1851 .with_context(|| format!("branch not found: {}", branch_id.as_str()))?;
1852 Ok(BranchInfo {
1853 branch_id: branch_id.clone(),
1854 parent_branch: state.parent_branch.clone(),
1855 fork_sequence: state.fork_sequence,
1856 head_sequence: state.head_sequence,
1857 merged_into: state.merged_into.clone(),
1858 })
1859 }
1860
1861 fn should_checkpoint(&self, session_id: &SessionId) -> Result<bool> {
1862 let mut sessions = self.sessions.lock();
1863 let session = sessions
1864 .get_mut(session_id.as_str())
1865 .with_context(|| format!("session not found: {session_id}"))?;
1866 session.tick_count += 1;
1867 Ok(session.tick_count % self.config.checkpoint_every_ticks == 0)
1868 }
1869
1870 async fn create_checkpoint(
1871 &self,
1872 session_id: &SessionId,
1873 branch_id: &BranchId,
1874 manifest: &SessionManifest,
1875 state: &AgentStateVector,
1876 ) -> Result<CheckpointManifest> {
1877 let checkpoint_id = CheckpointId::default();
1878 let state_hash = sha256_json(state)?;
1879 let checkpoint = CheckpointManifest {
1880 checkpoint_id: checkpoint_id.clone(),
1881 session_id: session_id.clone(),
1882 branch_id: branch_id.clone(),
1883 created_at: Utc::now(),
1884 event_sequence: self.peek_last_sequence(session_id, branch_id)?,
1885 state_hash,
1886 note: "automatic heartbeat checkpoint".to_owned(),
1887 };
1888
1889 let checkpoint_dir = PathBuf::from(&manifest.workspace_root)
1890 .join("checkpoints")
1891 .join(checkpoint_id.as_str());
1892 fs::create_dir_all(&checkpoint_dir).await?;
1893 self.write_pretty_json(checkpoint_dir.join("manifest.json"), &checkpoint)
1894 .await?;
1895 Ok(checkpoint)
1896 }
1897
1898 async fn write_heartbeat(
1899 &self,
1900 session_id: &SessionId,
1901 state: &AgentStateVector,
1902 mode: &OperatingMode,
1903 ) -> Result<()> {
1904 let workspace_root = {
1905 let sessions = self.sessions.lock();
1906 let session = sessions
1907 .get(session_id.as_str())
1908 .with_context(|| format!("session not found: {session_id}"))?;
1909 session.manifest.workspace_root.clone()
1910 };
1911
1912 let payload = serde_json::json!({
1913 "at": Utc::now(),
1914 "mode": mode,
1915 "state": state,
1916 });
1917 self.write_pretty_json(
1918 PathBuf::from(workspace_root).join("state/heartbeat.json"),
1919 &payload,
1920 )
1921 .await
1922 }
1923
1924 fn persist_runtime_state(
1925 &self,
1926 session_id: &SessionId,
1927 state: AgentStateVector,
1928 mode: OperatingMode,
1929 ) -> Result<()> {
1930 let mut sessions = self.sessions.lock();
1931 let session = sessions
1932 .get_mut(session_id.as_str())
1933 .with_context(|| format!("session not found: {session_id}"))?;
1934 session.state_vector = state.clone();
1935 session.mode = mode;
1936
1937 if let Some(parent) = self.config.root.parent() {
1939 let lake_dir = parent.join(".lake");
1940 let state_json = serde_json::to_string_pretty(&state).unwrap_or_default();
1941 let mode_str = match mode {
1942 OperatingMode::Explore => "explore",
1943 OperatingMode::Execute => "execute",
1944 OperatingMode::Verify => "verify",
1945 OperatingMode::AskHuman => "ask_human",
1946 OperatingMode::Recover => "recover",
1947 OperatingMode::Sleep => "sleep",
1948 };
1949
1950 let lake_dir_clone = lake_dir.clone();
1953 tokio::spawn(async move {
1954 let _ = fs::create_dir_all(&lake_dir_clone).await;
1955 let _ = fs::write(lake_dir_clone.join("state.json"), state_json).await;
1956 let _ = fs::write(lake_dir_clone.join("mode.txt"), mode_str).await;
1957 });
1958 }
1959
1960 Ok(())
1961 }
1962
1963 async fn current_tick_output(
1964 &self,
1965 session_id: &SessionId,
1966 branch_id: &BranchId,
1967 mode: OperatingMode,
1968 state: AgentStateVector,
1969 events_emitted: u64,
1970 ) -> Result<TickOutput> {
1971 Ok(TickOutput {
1972 session_id: session_id.clone(),
1973 mode,
1974 state,
1975 events_emitted,
1976 last_sequence: self.peek_last_sequence(session_id, branch_id)?,
1977 })
1978 }
1979
1980 async fn initialize_workspace(&self, root: &Path) -> Result<()> {
1981 let directories = [
1982 "events",
1983 "checkpoints",
1984 "state",
1985 "tools/runs",
1986 "artifacts/build",
1987 "artifacts/reports",
1988 "memory",
1989 "inbox/human_requests",
1990 "outbox/ui_stream",
1991 ];
1992
1993 for directory in directories {
1994 fs::create_dir_all(root.join(directory)).await?;
1995 }
1996
1997 let thread_path = root.join("state/thread.md");
1998 if !fs::try_exists(&thread_path).await.unwrap_or(false) {
1999 fs::write(&thread_path, "# Session Thread\n\n- Session created\n").await?;
2000 }
2001
2002 let plan_path = root.join("state/plan.yaml");
2003 if !fs::try_exists(&plan_path).await.unwrap_or(false) {
2004 fs::write(
2005 &plan_path,
2006 "version: 1\nmode: explore\nsteps:\n - id: bootstrap\n status: pending\n",
2007 )
2008 .await?;
2009 }
2010
2011 let task_graph_path = root.join("state/task_graph.json");
2012 if !fs::try_exists(&task_graph_path).await.unwrap_or(false) {
2013 fs::write(
2014 &task_graph_path,
2015 serde_json::to_string_pretty(&serde_json::json!({
2016 "nodes": [{"id": "bootstrap", "type": "task"}],
2017 "edges": [],
2018 }))?,
2019 )
2020 .await?;
2021 }
2022
2023 Ok(())
2024 }
2025
2026 fn session_root(&self, session_id: &SessionId) -> PathBuf {
2027 self.config.root.join("sessions").join(session_id.as_str())
2028 }
2029
2030 async fn write_pretty_json<T: Serialize>(&self, path: PathBuf, value: &T) -> Result<()> {
2031 if let Some(parent) = path.parent() {
2032 fs::create_dir_all(parent).await?;
2033 }
2034 let payload = serde_json::to_string_pretty(value)?;
2035 fs::write(path, payload).await?;
2036 Ok(())
2037 }
2038}
2039
2040fn sha256_json<T: Serialize>(value: &T) -> Result<String> {
2041 let payload = serde_json::to_vec(value)?;
2042 Ok(sha256_bytes(&payload))
2043}
2044
2045fn write_trace_context_on_record(event: &mut EventRecord) {
2050 use opentelemetry::trace::TraceContextExt;
2051 use tracing_opentelemetry::OpenTelemetrySpanExt;
2052
2053 let current_span = tracing::Span::current();
2054 let otel_context = current_span.context();
2055 let span_ref = otel_context.span();
2056 let span_context = span_ref.span_context();
2057
2058 if span_context.is_valid() {
2059 event.trace_id = Some(span_context.trace_id().to_string());
2060 event.span_id = Some(span_context.span_id().to_string());
2061 }
2062}
2063
2064fn operating_mode_str(mode: &OperatingMode) -> &'static str {
2065 match mode {
2066 OperatingMode::Explore => "explore",
2067 OperatingMode::Execute => "execute",
2068 OperatingMode::Verify => "verify",
2069 OperatingMode::AskHuman => "ask_human",
2070 OperatingMode::Recover => "recover",
2071 OperatingMode::Sleep => "sleep",
2072 }
2073}
2074
2075fn model_stop_reason_string(stop_reason: &aios_protocol::ModelStopReason) -> String {
2076 match stop_reason {
2077 aios_protocol::ModelStopReason::Completed => "completed".to_owned(),
2078 aios_protocol::ModelStopReason::ToolCall => "tool_call".to_owned(),
2079 aios_protocol::ModelStopReason::MaxIterations => "max_iterations".to_owned(),
2080 aios_protocol::ModelStopReason::Cancelled => "cancelled".to_owned(),
2081 aios_protocol::ModelStopReason::Error => "error".to_owned(),
2082 aios_protocol::ModelStopReason::Other(reason) => reason.clone(),
2083 }
2084}
2085
2086fn extract_observation(event: &EventRecord) -> Option<aios_protocol::Observation> {
2087 let text = match &event.kind {
2088 EventKind::ToolCallCompleted {
2089 tool_name,
2090 result,
2091 status,
2092 ..
2093 } => format!("tool call completed ({tool_name}): {result} [status={status:?}]"),
2094 EventKind::ErrorRaised { message } => format!("error observed: {message}"),
2095 EventKind::CheckpointCreated { checkpoint_id, .. } => {
2096 format!("checkpoint created: {checkpoint_id}")
2097 }
2098 _ => return None,
2099 };
2100
2101 Some(aios_protocol::Observation {
2102 observation_id: uuid::Uuid::new_v4(),
2103 created_at: event.timestamp,
2104 text,
2105 tags: vec!["auto".to_owned()],
2106 provenance: aios_protocol::Provenance {
2107 event_start: event.sequence,
2108 event_end: event.sequence,
2109 files: vec![FileProvenance {
2110 path: format!(
2111 "events/{}.jsonl#branch={}",
2112 event.session_id.as_str(),
2113 event.branch_id.as_str()
2114 ),
2115 sha256: "pending".to_owned(),
2116 }],
2117 },
2118 })
2119}
2120
2121fn sha256_bytes(bytes: &[u8]) -> String {
2122 let digest = Sha256::digest(bytes);
2123 hex::encode(digest)
2124}
2125
2126fn hash_tool_call_signature(call: &ToolCall) -> String {
2127 let payload = serde_json::json!({
2128 "tool_name": call.tool_name,
2129 "input": normalize_json_value(&call.input),
2130 });
2131 let serialized = serde_json::to_vec(&payload).unwrap_or_default();
2132 let mut hasher = Hasher::new();
2133 hasher.update(&serialized);
2134 hasher.finalize().to_hex().to_string()
2135}
2136
2137fn normalize_json_value(value: &Value) -> Value {
2138 match value {
2139 Value::Object(map) => {
2140 let mut keys = map.keys().cloned().collect::<Vec<_>>();
2141 keys.sort();
2142 let normalized = keys
2143 .into_iter()
2144 .map(|key| {
2145 let normalized_value = map
2146 .get(&key)
2147 .map(normalize_json_value)
2148 .unwrap_or(Value::Null);
2149 (key, normalized_value)
2150 })
2151 .collect::<Map<String, Value>>();
2152 Value::Object(normalized)
2153 }
2154 Value::Array(items) => Value::Array(items.iter().map(normalize_json_value).collect()),
2155 _ => value.clone(),
2156 }
2157}
2158
2159fn event_kind_name(kind: &EventKind) -> &'static str {
2160 match kind {
2161 EventKind::SessionCreated { .. } => "session_created",
2162 EventKind::BranchCreated { .. } => "branch_created",
2163 EventKind::BranchMerged { .. } => "branch_merged",
2164 EventKind::PhaseEntered { .. } => "phase_entered",
2165 EventKind::DeliberationProposed { .. } => "deliberation_proposed",
2166 EventKind::ApprovalRequested { .. } => "approval_requested",
2167 EventKind::ApprovalResolved { .. } => "approval_resolved",
2168 EventKind::ToolCallRequested { .. } => "tool_call_requested",
2169 EventKind::ToolCallStarted { .. } => "tool_call_started",
2170 EventKind::ToolCallCompleted { .. } => "tool_call_completed",
2171 EventKind::VoiceSessionStarted { .. } => "voice_session_started",
2172 EventKind::VoiceInputChunk { .. } => "voice_input_chunk",
2173 EventKind::VoiceOutputChunk { .. } => "voice_output_chunk",
2174 EventKind::VoiceSessionStopped { .. } => "voice_session_stopped",
2175 EventKind::VoiceAdapterError { .. } => "voice_adapter_error",
2176 EventKind::FileMutated { .. } => "file_mutated",
2177 EventKind::Heartbeat { .. } => "heartbeat",
2178 EventKind::CheckpointCreated { .. } => "checkpoint_created",
2179 EventKind::StateEstimated { .. } => "state_estimated",
2180 EventKind::BudgetUpdated { .. } => "budget_updated",
2181 EventKind::CircuitBreakerTripped { .. } => "circuit_breaker_tripped",
2182 EventKind::ErrorRaised { .. } => "error_raised",
2183 _ => "custom",
2184 }
2185}
2186
2187#[allow(dead_code)]
2188fn _budget_sanity(budget: &BudgetState) -> Result<()> {
2189 if budget.cost_remaining_usd < 0.0 {
2190 bail!("budget cannot be negative");
2191 }
2192 Ok(())
2193}