1use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8
9use nexo_config::types::llm::AutoCompactionConfig;
10use nexo_driver_claude::SessionBindingStore;
11use nexo_driver_permission::PermissionDecider;
12use nexo_driver_types::{
13 AcceptanceVerdict, AttemptOutcome, AttemptParams, AutoCompactBreaker, AutoDreamHook,
14 AutoDreamOutcomeKind, BudgetGuards, BudgetUsage, CancellationToken, CompactContext,
15 CompactPolicy, CompactSummary, CompactSummaryStore, DefaultCompactPolicy, DreamContextLite,
16 Goal, GoalId,
17};
18use serde::{Deserialize, Serialize};
19use tokio_util::sync::CancellationToken as TokioCancel;
20
21use crate::acceptance::{AcceptanceEvaluator, NoopAcceptanceEvaluator};
22use crate::attempt::{run_attempt, AttemptContext};
23use crate::compact_store::NoopCompactSummaryStore;
24use crate::error::DriverError;
25use crate::events::{DriverEvent, DriverEventSink, NoopEventSink};
26use crate::extract_memories::ExtractMemories;
27use crate::mcp_config::write_mcp_config;
28use crate::post_compact_cleanup::PostCompactCleanup;
29use crate::proactive::{build_tick_prompt, wait_for_wake, ScheduledWake, WakeResult};
30use crate::replay::{
31 DefaultReplayPolicy, ReplayContext, ReplayDecision, ReplayOutcomeHint, ReplayPolicy,
32};
33#[cfg(unix)]
37use crate::socket::DriverSocketServer;
38use crate::workspace::WorkspaceManager;
39use dashmap::DashMap;
40use tokio::sync::watch;
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct GoalOutcome {
44 pub goal_id: GoalId,
45 pub outcome: AttemptOutcome,
46 pub total_turns: u32,
47 pub usage: BudgetUsage,
48 pub final_text: Option<String>,
49 pub acceptance: Option<AcceptanceVerdict>,
50 #[serde(with = "humantime_serde")]
51 pub elapsed: Duration,
52}
53
54pub struct DriverOrchestrator {
55 claude_cfg: nexo_driver_claude::ClaudeConfig,
56 binding_store: Arc<dyn SessionBindingStore>,
57 acceptance: Arc<dyn AcceptanceEvaluator>,
58 workspace_manager: Arc<WorkspaceManager>,
59 event_sink: Arc<dyn DriverEventSink>,
60 replay_policy: Arc<dyn ReplayPolicy>,
62 compact_policy: Arc<dyn CompactPolicy>,
64 compact_context_window: u64,
65 auto_config: Option<AutoCompactionConfig>,
67 compact_breaker: Mutex<AutoCompactBreaker>,
69 compact_store: Arc<dyn CompactSummaryStore>,
71 progress_every_turns: u32,
75 pause_signals: Arc<DashMap<GoalId, watch::Sender<bool>>>,
81 cancel_tokens: Arc<DashMap<GoalId, CancellationToken>>,
86 budget_overrides: Arc<DashMap<GoalId, BudgetGuards>>,
91 pending_interrupts: Arc<DashMap<GoalId, std::collections::VecDeque<String>>>,
97 extract_memories: Option<Arc<ExtractMemories>>,
99 memory_dir: Option<PathBuf>,
101 auto_dream: std::sync::Mutex<std::collections::HashMap<String, Arc<dyn AutoDreamHook>>>,
112 bin_path: PathBuf,
113 socket_path: PathBuf,
114 _socket_handle: tokio::task::JoinHandle<Result<(), DriverError>>,
116 socket_cancel: TokioCancel,
117 cancel_root: CancellationToken,
118}
119
120#[derive(Default)]
121pub struct DriverOrchestratorBuilder {
122 claude_cfg: Option<nexo_driver_claude::ClaudeConfig>,
123 binding_store: Option<Arc<dyn SessionBindingStore>>,
124 acceptance: Option<Arc<dyn AcceptanceEvaluator>>,
125 decider: Option<Arc<dyn PermissionDecider>>,
126 workspace_manager: Option<Arc<WorkspaceManager>>,
127 event_sink: Option<Arc<dyn DriverEventSink>>,
128 replay_policy: Option<Arc<dyn ReplayPolicy>>,
129 compact_policy: Option<Arc<dyn CompactPolicy>>,
130 compact_context_window: u64,
131 auto_config: Option<AutoCompactionConfig>,
132 compact_store: Option<Arc<dyn CompactSummaryStore>>,
133 extract_memories: Option<Arc<ExtractMemories>>,
134 memory_dir: Option<PathBuf>,
135 auto_dream: Option<Arc<dyn AutoDreamHook>>,
137 progress_every_turns: u32,
138 bin_path: Option<PathBuf>,
139 socket_path: Option<PathBuf>,
140 cancel_root: Option<CancellationToken>,
141}
142
143impl DriverOrchestratorBuilder {
144 pub fn claude_config(mut self, cfg: nexo_driver_claude::ClaudeConfig) -> Self {
145 self.claude_cfg = Some(cfg);
146 self
147 }
148 pub fn binding_store(mut self, s: Arc<dyn SessionBindingStore>) -> Self {
149 self.binding_store = Some(s);
150 self
151 }
152 pub fn acceptance(mut self, a: Arc<dyn AcceptanceEvaluator>) -> Self {
153 self.acceptance = Some(a);
154 self
155 }
156 pub fn decider(mut self, d: Arc<dyn PermissionDecider>) -> Self {
157 self.decider = Some(d);
158 self
159 }
160 pub fn workspace_manager(mut self, w: Arc<WorkspaceManager>) -> Self {
161 self.workspace_manager = Some(w);
162 self
163 }
164 pub fn event_sink(mut self, e: Arc<dyn DriverEventSink>) -> Self {
165 self.event_sink = Some(e);
166 self
167 }
168 pub fn bin_path(mut self, p: impl Into<PathBuf>) -> Self {
169 self.bin_path = Some(p.into());
170 self
171 }
172 pub fn socket_path(mut self, p: impl Into<PathBuf>) -> Self {
173 self.socket_path = Some(p.into());
174 self
175 }
176 pub fn cancel_root(mut self, c: CancellationToken) -> Self {
177 self.cancel_root = Some(c);
178 self
179 }
180 pub fn replay_policy(mut self, p: Arc<dyn ReplayPolicy>) -> Self {
181 self.replay_policy = Some(p);
182 self
183 }
184 pub fn compact_policy(mut self, p: Arc<dyn CompactPolicy>) -> Self {
185 self.compact_policy = Some(p);
186 self
187 }
188 pub fn compact_context_window(mut self, n: u64) -> Self {
189 self.compact_context_window = n;
190 self
191 }
192 pub fn auto_config(mut self, a: AutoCompactionConfig) -> Self {
193 self.auto_config = Some(a);
194 self
195 }
196 pub fn compact_store(mut self, s: Arc<dyn CompactSummaryStore>) -> Self {
197 self.compact_store = Some(s);
198 self
199 }
200 pub fn extract_memories(mut self, e: Arc<ExtractMemories>) -> Self {
201 self.extract_memories = Some(e);
202 self
203 }
204 pub fn memory_dir(mut self, p: impl Into<PathBuf>) -> Self {
205 self.memory_dir = Some(p.into());
206 self
207 }
208 pub fn auto_dream(mut self, hook: Arc<dyn AutoDreamHook>) -> Self {
211 self.auto_dream = Some(hook);
212 self
213 }
214 pub fn progress_every_turns(mut self, n: u32) -> Self {
215 self.progress_every_turns = n;
216 self
217 }
218
219 pub async fn build(self) -> Result<DriverOrchestrator, DriverError> {
220 let claude_cfg = self
221 .claude_cfg
222 .ok_or_else(|| DriverError::Config("claude config required".into()))?;
223 let binding_store = self
224 .binding_store
225 .ok_or_else(|| DriverError::Config("binding_store required".into()))?;
226 let decider = self
227 .decider
228 .ok_or_else(|| DriverError::Config("decider required".into()))?;
229 let workspace_manager = self
230 .workspace_manager
231 .ok_or_else(|| DriverError::Config("workspace_manager required".into()))?;
232 let bin_path = self
233 .bin_path
234 .ok_or_else(|| DriverError::Config("bin_path required".into()))?;
235 let socket_path = self
236 .socket_path
237 .ok_or_else(|| DriverError::Config("socket_path required".into()))?;
238 let acceptance: Arc<dyn AcceptanceEvaluator> = self
239 .acceptance
240 .unwrap_or_else(|| Arc::new(NoopAcceptanceEvaluator));
241 let event_sink: Arc<dyn DriverEventSink> =
242 self.event_sink.unwrap_or_else(|| Arc::new(NoopEventSink));
243 let replay_policy: Arc<dyn ReplayPolicy> = self
244 .replay_policy
245 .unwrap_or_else(|| Arc::new(DefaultReplayPolicy::default()));
246 let compact_policy: Arc<dyn CompactPolicy> = self
247 .compact_policy
248 .unwrap_or_else(|| Arc::new(DefaultCompactPolicy::default()));
249 let compact_context_window = self.compact_context_window;
250 let auto_config = self.auto_config;
251 let compact_store: Arc<dyn CompactSummaryStore> = self
252 .compact_store
253 .unwrap_or_else(|| Arc::new(NoopCompactSummaryStore));
254 let progress_every_turns = self.progress_every_turns;
255 let cancel_root = self.cancel_root.unwrap_or_default();
256
257 let socket_cancel = TokioCancel::new();
264 #[cfg(unix)]
265 let socket_handle = {
266 let _ = &socket_path; let server =
268 DriverSocketServer::bind(&socket_path, decider, socket_cancel.clone()).await?;
269 tokio::spawn(server.run())
270 };
271 #[cfg(not(unix))]
272 let socket_handle = {
273 let _ = (&socket_path, decider);
274 tokio::spawn(async { Ok::<(), DriverError>(()) })
275 };
276
277 Ok(DriverOrchestrator {
278 claude_cfg,
279 binding_store,
280 acceptance,
281 workspace_manager,
282 event_sink,
283 replay_policy,
284 compact_policy,
285 compact_context_window,
286 auto_config,
287 compact_breaker: Mutex::new(AutoCompactBreaker::default()),
288 compact_store,
289 extract_memories: self.extract_memories,
290 memory_dir: self.memory_dir,
291 auto_dream: {
292 let mut m: std::collections::HashMap<String, Arc<dyn AutoDreamHook>> =
297 std::collections::HashMap::new();
298 if let Some(hook) = self.auto_dream {
299 m.insert("_default".to_string(), hook);
300 }
301 std::sync::Mutex::new(m)
302 },
303 progress_every_turns,
304 pause_signals: Arc::new(DashMap::new()),
305 cancel_tokens: Arc::new(DashMap::new()),
306 budget_overrides: Arc::new(DashMap::new()),
307 pending_interrupts: Arc::new(DashMap::new()),
308 bin_path,
309 socket_path,
310 _socket_handle: socket_handle,
311 socket_cancel,
312 cancel_root,
313 })
314 }
315}
316
317impl DriverOrchestrator {
318 pub fn builder() -> DriverOrchestratorBuilder {
319 DriverOrchestratorBuilder::default()
320 }
321
322 pub fn register_auto_dream(
329 &self,
330 agent_id: String,
331 hook: Arc<dyn AutoDreamHook>,
332 ) -> Option<Arc<dyn AutoDreamHook>> {
333 let mut guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
334 guard.insert(agent_id, hook)
335 }
336
337 pub fn unregister_auto_dream(&self, agent_id: &str) -> Option<Arc<dyn AutoDreamHook>> {
340 let mut guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
341 guard.remove(agent_id)
342 }
343
344 pub fn auto_dream_agents(&self) -> Vec<String> {
348 let guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
349 let mut ids: Vec<String> = guard.keys().cloned().collect();
350 ids.sort();
351 ids
352 }
353
354 pub fn has_auto_dream(&self) -> bool {
356 !self
357 .auto_dream
358 .lock()
359 .unwrap_or_else(|p| p.into_inner())
360 .is_empty()
361 }
362
363 #[deprecated(
369 since = "0.1.2",
370 note = "use register_auto_dream(agent_id, hook) for multi-runner routing"
371 )]
372 pub fn set_auto_dream(&self, hook: Option<Arc<dyn AutoDreamHook>>) {
373 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
374 WARNED.get_or_init(|| {
375 tracing::warn!(
376 target: "auto_dream.deprecation",
377 "DriverOrchestrator::set_auto_dream is deprecated; use register_auto_dream(agent_id, hook) for per-agent routing"
378 );
379 });
380 let mut guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
381 match hook {
382 Some(h) => {
383 guard.insert("_default".to_string(), h);
384 }
385 None => guard.clear(),
386 }
387 }
388
389 pub fn pause_goal(&self, goal_id: GoalId) -> bool {
392 if let Some(tx) = self.pause_signals.get(&goal_id) {
393 tx.send_replace(true);
398 return true;
399 }
400 false
401 }
402
403 pub fn resume_goal(&self, goal_id: GoalId) -> bool {
405 if let Some(tx) = self.pause_signals.get(&goal_id) {
406 tx.send_replace(false);
407 return true;
408 }
409 false
410 }
411
412 pub fn is_paused(&self, goal_id: GoalId) -> bool {
415 self.pause_signals
416 .get(&goal_id)
417 .map(|tx| *tx.borrow())
418 .unwrap_or(false)
419 }
420
421 pub fn set_goal_max_turns(&self, goal_id: GoalId, new_max: u32) -> Option<u32> {
426 if !self.cancel_tokens.contains_key(&goal_id) {
427 return None;
428 }
429 let mut entry = self
430 .budget_overrides
431 .entry(goal_id)
432 .or_insert_with(|| BudgetGuards {
433 max_turns: new_max,
434 max_wall_time: Duration::from_secs(60 * 60 * 24 * 365),
435 max_tokens: u64::MAX,
436 max_consecutive_denies: u32::MAX,
437 max_consecutive_errors: u32::MAX,
438 max_consecutive_413: 2,
439 });
440 if new_max > entry.value().max_turns {
441 entry.value_mut().max_turns = new_max;
442 }
443 Some(entry.value().max_turns)
444 }
445
446 pub fn interrupt_goal(&self, goal_id: GoalId, message: impl Into<String>) -> usize {
453 let mut entry = self.pending_interrupts.entry(goal_id).or_default();
454 entry.value_mut().push_back(message.into());
455 entry.value().len()
456 }
457
458 pub(crate) fn drain_interrupts(&self, goal_id: GoalId) -> Vec<String> {
463 self.pending_interrupts
464 .get_mut(&goal_id)
465 .map(|mut e| e.value_mut().drain(..).collect::<Vec<_>>())
466 .unwrap_or_default()
467 }
468
469 pub fn pre_register_goal(&self, goal_id: GoalId) {
477 self.pause_signals.entry(goal_id).or_insert_with(|| {
478 let (tx, _rx) = watch::channel(false);
479 tx
480 });
481 self.cancel_tokens
482 .entry(goal_id)
483 .or_insert_with(|| self.cancel_root.child_token());
484 }
485
486 pub fn cancel_goal(&self, goal_id: GoalId) -> bool {
492 if let Some(tok) = self.cancel_tokens.get(&goal_id) {
493 tok.cancel();
494 return true;
495 }
496 false
497 }
498
499 pub fn is_cancelled(&self, goal_id: GoalId) -> bool {
501 self.cancel_tokens
502 .get(&goal_id)
503 .map(|t| t.is_cancelled())
504 .unwrap_or(false)
505 }
506
507 pub fn spawn_goal(
514 self: Arc<Self>,
515 goal: Goal,
516 ) -> tokio::task::JoinHandle<Result<GoalOutcome, DriverError>> {
517 tokio::spawn(async move { self.run_goal(goal).await })
518 }
519
520 pub async fn run_goal(&self, goal: Goal) -> Result<GoalOutcome, DriverError> {
522 let started = Instant::now();
523 let goal_id = goal.id;
524
525 let mut pause_rx = match self.pause_signals.get(&goal_id) {
530 Some(existing) => existing.value().subscribe(),
531 None => {
532 let (tx, rx) = watch::channel(false);
533 self.pause_signals.insert(goal_id, tx);
534 rx
535 }
536 };
537 let goal_cancel = match self.cancel_tokens.get(&goal_id) {
542 Some(t) => t.value().clone(),
543 None => {
544 let t = self.cancel_root.child_token();
545 self.cancel_tokens.insert(goal_id, t.clone());
546 t
547 }
548 };
549
550 let _ = self
551 .event_sink
552 .publish(DriverEvent::GoalStarted { goal: goal.clone() })
553 .await;
554
555 let workspace = self.workspace_manager.ensure(&goal).await?;
557 let mcp_config_path = write_mcp_config(&workspace, &self.bin_path, &self.socket_path)?;
558
559 let mut usage = BudgetUsage::default();
561 let mut prior_failures: Vec<nexo_driver_types::AcceptanceFailure> = Vec::new();
562 let mut last_acceptance: Option<AcceptanceVerdict> = None;
563 let mut final_text: Option<String> = None;
564 let mut total_turns: u32 = 0;
565 let mut next_extras: Option<serde_json::Map<String, serde_json::Value>> = match self
568 .compact_store
569 .load(&goal.id.0.to_string(), &goal_id)
570 .await
571 {
572 Ok(Some(prior)) => {
573 let mut e = serde_json::Map::new();
574 e.insert(
575 "compact_summary".into(),
576 serde_json::Value::String(prior.summary),
577 );
578 Some(e)
579 }
580 _ => None,
581 };
582 let mut last_was_compact = false;
583 let mut compact_before_tokens: u64 = 0;
587 let final_outcome: AttemptOutcome;
588
589 loop {
590 while *pause_rx.borrow() {
594 if goal_cancel.is_cancelled() {
595 break;
596 }
597 tokio::select! {
598 _ = pause_rx.changed() => {}
599 _ = goal_cancel.cancelled() => break,
600 }
601 }
602
603 let effective_budget = match self.budget_overrides.get(&goal_id) {
607 Some(o) => BudgetGuards {
608 max_turns: o.value().max_turns.max(goal.budget.max_turns),
609 ..goal.budget.clone()
610 },
611 None => goal.budget.clone(),
612 };
613 if let Some(axis) = effective_budget.is_exhausted(&usage) {
614 let _ = self
615 .event_sink
616 .publish(DriverEvent::BudgetExhausted {
617 goal_id,
618 axis,
619 usage: usage.clone(),
620 })
621 .await;
622 final_outcome = AttemptOutcome::BudgetExhausted { axis };
623 break;
624 }
625 if goal_cancel.is_cancelled() {
626 final_outcome = AttemptOutcome::Cancelled;
627 break;
628 }
629
630 let _ = self
631 .event_sink
632 .publish(DriverEvent::AttemptStarted {
633 goal_id,
634 turn_index: total_turns,
635 usage: usage.clone(),
636 })
637 .await;
638
639 let cancel = goal_cancel.clone();
640 let mut extras = next_extras.take().unwrap_or_else(|| {
641 build_attempt_extras(&prior_failures, &goal.budget, total_turns)
642 });
643 let pending = self.drain_interrupts(goal_id);
650 if !pending.is_empty() {
651 extras.insert(
652 "operator_messages".into(),
653 serde_json::Value::Array(
654 pending.into_iter().map(serde_json::Value::String).collect(),
655 ),
656 );
657 }
658 let params = AttemptParams {
659 goal: goal.clone(),
660 turn_index: total_turns,
661 usage: usage.clone(),
662 prior_decisions: Vec::new(),
663 cancel,
664 extras,
665 };
666
667 let cp_label = format!("turn-{total_turns}-pre");
670 let cp_sha = self
671 .workspace_manager
672 .checkpoint(&workspace, &cp_label)
673 .await
674 .unwrap_or_else(|e| {
675 tracing::warn!(target: "driver-loop", "checkpoint failed: {e}");
676 crate::workspace::WorkspaceManager::NO_GIT_SENTINEL.to_string()
677 });
678
679 let ctx = AttemptContext {
680 claude_cfg: &self.claude_cfg,
681 binding_store: &self.binding_store,
682 acceptance: &self.acceptance,
683 workspace: &workspace,
684 mcp_config_path: &mcp_config_path,
685 bin_path: &self.bin_path,
686 cancel: goal_cancel.clone(),
687 };
688 tracing::info!(
689 target: "driver-loop",
690 goal_id = ?goal_id,
691 turn_index = total_turns,
692 "phase78: spawning attempt",
693 );
694 let mut result = run_attempt(ctx, params).await?;
695 tracing::info!(
696 target: "driver-loop",
697 goal_id = ?goal_id,
698 turn_index = total_turns,
699 outcome = ?result.outcome,
700 "phase78: attempt returned",
701 );
702
703 if cp_sha != crate::workspace::WorkspaceManager::NO_GIT_SENTINEL {
705 if let Ok(diff) = self.workspace_manager.diff_stat(&workspace, &cp_sha).await {
706 if !diff.trim().is_empty() {
707 result
708 .harness_extras
709 .insert("worktree.diff_stat".into(), serde_json::Value::String(diff));
710 result.harness_extras.insert(
711 "worktree.checkpoint_sha".into(),
712 serde_json::Value::String(cp_sha.clone()),
713 );
714 }
715 }
716 }
717
718 usage = result.usage_after.clone();
719
720 if last_was_compact {
723 last_was_compact = false;
724 let compact_ok = matches!(
726 result.outcome,
727 AttemptOutcome::Done | AttemptOutcome::NeedsRetry { .. }
728 );
729 if compact_ok {
730 self.compact_breaker
731 .lock()
732 .unwrap()
733 .record_success(total_turns);
734 } else {
735 self.compact_breaker.lock().unwrap().record_failure();
736 }
737 let _ = self
738 .event_sink
739 .publish(DriverEvent::AttemptCompleted {
740 result: result.clone(),
741 })
742 .await;
743 let _ = self
744 .event_sink
745 .publish(DriverEvent::CompactCompleted {
746 goal_id,
747 turn_index: total_turns,
748 after_tokens: usage.tokens,
749 })
750 .await;
751 if compact_ok {
753 if let Some(ref summary_text) = result.final_text {
754 let summary = CompactSummary {
755 agent_id: goal.id.0.to_string(),
756 summary: summary_text.clone(),
757 turn_index: total_turns,
758 before_tokens: compact_before_tokens,
759 after_tokens: usage.tokens,
760 stored_at: chrono::Utc::now(),
761 cache_pin_keys: Vec::new(),
762 truncated_tool_results: Vec::new(),
763 };
764 let _ = self.compact_store.store(summary).await;
765 }
766 let _ = self
767 .event_sink
768 .publish(DriverEvent::CompactSummaryStored {
769 goal_id,
770 turn_index: total_turns,
771 before_tokens: compact_before_tokens,
772 after_tokens: usage.tokens,
773 })
774 .await;
775 let mut cleanup = PostCompactCleanup::new();
777 if let (Some(ref extract), Some(ref memory_dir)) =
778 (&self.extract_memories, &self.memory_dir)
779 {
780 cleanup =
781 cleanup.with_extract_memories(Arc::clone(extract), memory_dir.clone());
782 }
783 cleanup.run().await;
784 }
785 continue;
786 }
787
788 final_text = result.final_text.clone();
789 last_acceptance = result.acceptance.clone();
790 total_turns += 1;
791 usage.turns = total_turns;
792
793 let _ = self
794 .event_sink
795 .publish(DriverEvent::AttemptCompleted {
796 result: result.clone(),
797 })
798 .await;
799
800 if self.progress_every_turns > 0
802 && total_turns > 0
803 && total_turns % self.progress_every_turns == 0
804 {
805 let _ = self
806 .event_sink
807 .publish(DriverEvent::Progress {
808 goal_id,
809 turn_index: total_turns,
810 usage: usage.clone(),
811 last_text: result.final_text.clone(),
812 })
813 .await;
814 }
815
816 if let Some(ref extract) = self.extract_memories {
818 extract.tick();
819 match extract.check_gates() {
820 Ok(()) => {
821 if let (Some(ref memory_dir), Some(ref final_text)) =
823 (&self.memory_dir, &result.final_text)
824 {
825 let messages_text = final_text.clone();
826 let dir = memory_dir.clone();
827 extract.extract(goal_id, total_turns, messages_text, dir);
828 }
829 }
830 Err(skip_reason) => {
831 let _ = self
832 .event_sink
833 .publish(DriverEvent::ExtractMemoriesSkipped {
834 goal_id,
835 reason: skip_reason,
836 })
837 .await;
838 }
839 }
840 }
841
842 let owning_agent_id = goal.agent_id().unwrap_or("").to_string();
857 let ad_opt: Option<Arc<dyn AutoDreamHook>> = {
858 let map = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
859 if owning_agent_id.is_empty() {
860 if !map.is_empty() {
861 tracing::warn!(
862 target: "auto_dream.dispatch",
863 goal_id = %goal_id.0,
864 "auto_dream skipped: goal.metadata.agent_id is empty (use Goal::with_agent_id)"
865 );
866 }
867 None
868 } else {
869 let hook = map.get(&owning_agent_id).cloned();
870 if hook.is_none() && !map.is_empty() {
871 tracing::debug!(
872 target: "auto_dream.dispatch",
873 goal_id = %goal_id.0,
874 agent_id = %owning_agent_id,
875 "auto_dream skipped: no runner registered for this agent"
876 );
877 }
878 hook
879 }
880 };
881 if let Some(ad) = ad_opt {
882 let transcript_dir = self
883 .workspace_manager
884 .root()
885 .join(".transcripts")
886 .join(goal_id.0.to_string());
887 let dream_ctx = DreamContextLite {
888 agent_id: owning_agent_id.clone(),
889 goal_id,
890 session_id: goal_id.0.to_string(),
891 transcript_dir,
892 kairos_active: false,
893 remote_mode: false,
894 };
895 let outcome_kind: AutoDreamOutcomeKind = ad.check_and_run(&dream_ctx).await;
896 let _ = self
897 .event_sink
898 .publish(DriverEvent::AutoDreamOutcome {
899 goal_id,
900 outcome_kind,
901 })
902 .await;
903 }
904
905 let session_age_minutes = started.elapsed().as_secs() / 60;
909 let max_failures = self
910 .auto_config
911 .as_ref()
912 .map(|a| a.max_consecutive_failures)
913 .unwrap_or(3);
914 let should_check = {
915 let breaker = self.compact_breaker.lock().unwrap();
916 !breaker.is_tripped(max_failures)
917 };
918 if should_check {
919 let last_compact = self.compact_breaker.lock().unwrap().last_compact_turn;
920 let compact_ctx = CompactContext {
921 goal_id,
922 turn_index: total_turns,
923 usage: &usage,
924 context_window: self.compact_context_window,
925 last_compact_turn: last_compact,
926 goal_description: &goal.description,
927 session_age_minutes,
928 auto_config: self.auto_config.as_ref(),
929 };
930 if let Some((focus, trigger)) = self.compact_policy.classify(&compact_ctx).await {
931 let before_tokens = usage.tokens;
932 compact_before_tokens = before_tokens;
933 let age_minutes = session_age_minutes;
934 let pressure = if self.compact_context_window > 0 {
935 usage.tokens as f64 / self.compact_context_window as f64
936 } else {
937 0.0
938 };
939 let _ = self
940 .event_sink
941 .publish(DriverEvent::CompactRequested {
942 goal_id,
943 turn_index: total_turns,
944 focus: focus.clone(),
945 token_pressure: pressure,
946 before_tokens,
947 age_minutes,
948 trigger,
949 })
950 .await;
951 let mut e = serde_json::Map::new();
952 e.insert("compact_turn".into(), serde_json::Value::Bool(true));
953 e.insert("compact_focus".into(), serde_json::Value::String(focus));
954 next_extras = Some(e);
955 last_was_compact = true;
956 }
957 }
958 if let Some(v) = &result.acceptance {
959 let _ = self
960 .event_sink
961 .publish(DriverEvent::Acceptance {
962 goal_id,
963 verdict: v.clone(),
964 })
965 .await;
966 }
967
968 match &result.outcome {
969 AttemptOutcome::Done => {
970 usage.consecutive_errors = 0;
971 final_outcome = AttemptOutcome::Done;
972 break;
973 }
974 AttemptOutcome::NeedsRetry { failures } => {
975 usage.consecutive_errors = 0;
976 prior_failures = failures.clone();
977 continue;
978 }
979 AttemptOutcome::Sleep {
980 duration_ms,
981 reason,
982 } => {
983 usage.consecutive_errors = 0;
984 prior_failures.clear();
985 let wake = ScheduledWake {
986 duration_ms: *duration_ms,
987 reason: reason.clone(),
988 sleep_started_at: Instant::now(),
989 };
990 match wait_for_wake(&wake, &goal_cancel).await {
991 WakeResult::Cancelled => {
992 final_outcome = AttemptOutcome::Cancelled;
993 break;
994 }
995 WakeResult::Fired { elapsed_ms } => {
996 let mut extras =
997 build_attempt_extras(&prior_failures, &goal.budget, total_turns);
998 extras.insert(
999 "synthetic_tick_prompt".into(),
1000 serde_json::Value::String(build_tick_prompt(&wake, elapsed_ms)),
1001 );
1002 next_extras = Some(extras);
1003 continue;
1004 }
1005 }
1006 }
1007 AttemptOutcome::Continue { reason } | AttemptOutcome::Escalate { reason } => {
1008 let hint = match &result.outcome {
1013 AttemptOutcome::Continue { .. } => ReplayOutcomeHint::Continue,
1014 _ => ReplayOutcomeHint::Escalate,
1015 };
1016 let cp_for_replay =
1017 if cp_sha == crate::workspace::WorkspaceManager::NO_GIT_SENTINEL {
1018 None
1019 } else {
1020 Some(cp_sha.as_str())
1021 };
1022 let ctx = ReplayContext {
1023 goal_id,
1024 turn_index: total_turns,
1025 pre_turn_checkpoint: cp_for_replay,
1026 usage: &usage,
1027 error_message: reason,
1028 last_outcome_hint: hint,
1029 };
1030 let decision = self.replay_policy.classify(&ctx).await;
1031 tracing::info!(
1032 target: "driver-loop",
1033 goal_id = ?goal_id,
1034 turn_index = total_turns,
1035 reason = %reason,
1036 decision = ?decision,
1037 "phase78: replay decision",
1038 );
1039 let _ = self
1040 .event_sink
1041 .publish(DriverEvent::ReplayDecision {
1042 goal_id,
1043 turn_index: total_turns,
1044 decision: decision.clone(),
1045 error_message: reason.clone(),
1046 })
1047 .await;
1048 match decision {
1049 ReplayDecision::FreshSessionRetry { rollback_to } => {
1050 let _ = self.binding_store.mark_invalid(goal_id).await;
1051 if let Some(sha) = rollback_to {
1052 let _ = self.workspace_manager.rollback(&workspace, &sha).await;
1053 }
1054 usage.consecutive_errors = usage.consecutive_errors.saturating_add(1);
1055 total_turns = total_turns.saturating_sub(1);
1058 usage.turns = total_turns;
1059 prior_failures.clear();
1060 tracing::info!(
1061 target: "driver-loop",
1062 goal_id = ?goal_id,
1063 "phase78: FreshSessionRetry — looping",
1064 );
1065 continue;
1066 }
1067 ReplayDecision::NextTurn { rollback_to } => {
1068 if let Some(sha) = rollback_to {
1069 let _ = self.workspace_manager.rollback(&workspace, &sha).await;
1070 }
1071 prior_failures.clear();
1072 tracing::info!(
1073 target: "driver-loop",
1074 goal_id = ?goal_id,
1075 next_turn = total_turns,
1076 "phase78: NextTurn — looping",
1077 );
1078 continue;
1079 }
1080 ReplayDecision::CompactAndRetry => {
1081 usage.consecutive_413 = usage.consecutive_413.saturating_add(1);
1086 total_turns = total_turns.saturating_sub(1);
1087 usage.turns = total_turns;
1088 prior_failures.clear();
1089 tracing::info!(
1090 target: "driver-loop",
1091 goal_id = ?goal_id,
1092 consecutive_413 = usage.consecutive_413,
1093 "phase85.1: CompactAndRetry — looping",
1094 );
1095 continue;
1096 }
1097 ReplayDecision::Escalate { reason } => {
1098 let _ = self
1099 .event_sink
1100 .publish(DriverEvent::Escalate {
1101 goal_id,
1102 reason: reason.clone(),
1103 })
1104 .await;
1105 final_outcome = AttemptOutcome::Escalate { reason };
1106 break;
1107 }
1108 }
1109 }
1110 AttemptOutcome::Cancelled => {
1111 final_outcome = AttemptOutcome::Cancelled;
1112 break;
1113 }
1114 AttemptOutcome::BudgetExhausted { axis } => {
1115 let _ = self
1116 .event_sink
1117 .publish(DriverEvent::BudgetExhausted {
1118 goal_id,
1119 axis: *axis,
1120 usage: usage.clone(),
1121 })
1122 .await;
1123 final_outcome = AttemptOutcome::BudgetExhausted { axis: *axis };
1124 break;
1125 }
1126 }
1127 }
1128
1129 let outcome = GoalOutcome {
1130 goal_id,
1131 outcome: final_outcome,
1132 total_turns,
1133 usage,
1134 final_text,
1135 acceptance: last_acceptance,
1136 elapsed: started.elapsed(),
1137 };
1138 let _ = self
1139 .event_sink
1140 .publish(DriverEvent::GoalCompleted {
1141 outcome: outcome.clone(),
1142 })
1143 .await;
1144 self.pause_signals.remove(&goal_id);
1146 self.cancel_tokens.remove(&goal_id);
1148 self.budget_overrides.remove(&goal_id);
1151 self.pending_interrupts.remove(&goal_id);
1155 Ok(outcome)
1156 }
1157
1158 pub async fn shutdown(self) -> Result<(), DriverError> {
1160 self.cancel_root.cancel();
1161 self.socket_cancel.cancel();
1162 let _ = self._socket_handle.await;
1163 Ok(())
1164 }
1165}
1166
1167fn build_attempt_extras(
1168 prior_failures: &[nexo_driver_types::AcceptanceFailure],
1169 budget: &BudgetGuards,
1170 turn_index: u32,
1171) -> serde_json::Map<String, serde_json::Value> {
1172 let mut m = serde_json::Map::new();
1173 m.insert(
1174 "turn_index".into(),
1175 serde_json::Value::Number(turn_index.into()),
1176 );
1177 m.insert(
1178 "max_turns".into(),
1179 serde_json::Value::Number(budget.max_turns.into()),
1180 );
1181 if !prior_failures.is_empty() {
1182 m.insert(
1183 "prior_failures".into(),
1184 serde_json::to_value(prior_failures).unwrap_or(serde_json::Value::Null),
1185 );
1186 }
1187 m
1188}