Skip to main content

nexo_driver_loop/
orchestrator.rs

1//! Goal-level loop. Takes a `Goal`, drives it to completion through
2//! the per-turn attempt loop. Emits NATS events at every major
3//! transition.
4
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8
9use nexo_config::types::llm::AutoCompactionConfig;
10use nexo_driver_claude::SessionBindingStore;
11use nexo_driver_permission::PermissionDecider;
12use nexo_driver_types::{
13    AcceptanceVerdict, AttemptOutcome, AttemptParams, AutoCompactBreaker, AutoDreamHook,
14    AutoDreamOutcomeKind, BudgetGuards, BudgetUsage, CancellationToken, CompactContext,
15    CompactPolicy, CompactSummary, CompactSummaryStore, DefaultCompactPolicy, DreamContextLite,
16    Goal, GoalId,
17};
18use serde::{Deserialize, Serialize};
19use tokio_util::sync::CancellationToken as TokioCancel;
20
21use crate::acceptance::{AcceptanceEvaluator, NoopAcceptanceEvaluator};
22use crate::attempt::{run_attempt, AttemptContext};
23use crate::compact_store::NoopCompactSummaryStore;
24use crate::error::DriverError;
25use crate::events::{DriverEvent, DriverEventSink, NoopEventSink};
26use crate::extract_memories::ExtractMemories;
27use crate::mcp_config::write_mcp_config;
28use crate::post_compact_cleanup::PostCompactCleanup;
29use crate::proactive::{build_tick_prompt, wait_for_wake, ScheduledWake, WakeResult};
30use crate::replay::{
31    DefaultReplayPolicy, ReplayContext, ReplayDecision, ReplayOutcomeHint, ReplayPolicy,
32};
33// Unix-only — orchestrator's socket-server bind path is also
34// gated below. Windows builds skip the permission-prompt
35// forwarder entirely (see lib.rs comment on socket module).
36#[cfg(unix)]
37use crate::socket::DriverSocketServer;
38use crate::workspace::WorkspaceManager;
39use dashmap::DashMap;
40use tokio::sync::watch;
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct GoalOutcome {
44    pub goal_id: GoalId,
45    pub outcome: AttemptOutcome,
46    pub total_turns: u32,
47    pub usage: BudgetUsage,
48    pub final_text: Option<String>,
49    pub acceptance: Option<AcceptanceVerdict>,
50    #[serde(with = "humantime_serde")]
51    pub elapsed: Duration,
52}
53
54pub struct DriverOrchestrator {
55    claude_cfg: nexo_driver_claude::ClaudeConfig,
56    binding_store: Arc<dyn SessionBindingStore>,
57    acceptance: Arc<dyn AcceptanceEvaluator>,
58    workspace_manager: Arc<WorkspaceManager>,
59    event_sink: Arc<dyn DriverEventSink>,
60    /// Replay-policy classifies mid-turn errors.
61    replay_policy: Arc<dyn ReplayPolicy>,
62    /// Opportunistic /compact policy.
63    compact_policy: Arc<dyn CompactPolicy>,
64    compact_context_window: u64,
65    /// Token + age auto-compaction config.
66    auto_config: Option<AutoCompactionConfig>,
67    /// Circuit breaker for compaction failures.
68    compact_breaker: Mutex<AutoCompactBreaker>,
69    /// Persist compact summaries for session resume.
70    compact_store: Arc<dyn CompactSummaryStore>,
71    /// Emit `DriverEvent::Progress` after every Nth completed attempt
72    /// so chat hooks can show 'still going'. `0` disables periodic
73    /// progress beacons.
74    progress_every_turns: u32,
75    /// Per-goal pause flag. `pause_goal(id)` flips the watch to
76    /// `true`; the loop blocks on the next iteration until
77    /// `resume_goal(id)` flips it back. The current Claude turn is
78    /// *not* killed — pause only takes effect at the natural boundary
79    /// between turns.
80    pause_signals: Arc<DashMap<GoalId, watch::Sender<bool>>>,
81    /// Per-goal CancellationToken so `cancel_agent` can stop one goal
82    /// without taking down the whole orchestrator via `cancel_root`.
83    /// The token is a child of `cancel_root` so a global shutdown
84    /// still cancels every running goal.
85    cancel_tokens: Arc<DashMap<GoalId, CancellationToken>>,
86    /// Operator overrides for in-flight goal budgets. The
87    /// loop consults this map at the top of every iteration so
88    /// `update_budget` actually changes the cap (not just the
89    /// snapshot). Currently only `max_turns` is grow-only mutable.
90    budget_overrides: Arc<DashMap<GoalId, BudgetGuards>>,
91    /// Operator-side messages queued for injection into the next
92    /// turn's prompt. The agent-side `interrupt_agent` tool fills
93    /// this; the loop drains it at the top of every iteration so
94    /// Claude sees the operator's note in its next turn input.
95    /// FIFO so multiple rapid interrupts arrive in order.
96    pending_interrupts: Arc<DashMap<GoalId, std::collections::VecDeque<String>>>,
97    /// Post-turn LLM memory extraction.
98    extract_memories: Option<Arc<ExtractMemories>>,
99    /// Root directory for persistent memory files.
100    memory_dir: Option<PathBuf>,
101    /// Post-turn autoDream consolidation hooks — a multi-runner
102    /// registry. Routing key is the owning agent id
103    /// (`goal.metadata["agent_id"]`).
104    /// Empty map = no auto_dream wired. Dispatch reads the agent
105    /// id from the active goal and looks up the hook here;
106    /// missing key = silent skip (debug log). HashMap chosen
107    /// over `arc_swap::ArcSwapOption` because the latter requires
108    /// `T: Sized` and `dyn AutoDreamHook` is unsized; per-turn
109    /// read cost = one uncontended lock acquire + an O(1) hash
110    /// lookup, negligible vs the rest of a turn.
111    auto_dream: std::sync::Mutex<std::collections::HashMap<String, Arc<dyn AutoDreamHook>>>,
112    bin_path: PathBuf,
113    socket_path: PathBuf,
114    /// Owns the spawned socket server; cancelling kills it.
115    _socket_handle: tokio::task::JoinHandle<Result<(), DriverError>>,
116    socket_cancel: TokioCancel,
117    cancel_root: CancellationToken,
118}
119
120#[derive(Default)]
121pub struct DriverOrchestratorBuilder {
122    claude_cfg: Option<nexo_driver_claude::ClaudeConfig>,
123    binding_store: Option<Arc<dyn SessionBindingStore>>,
124    acceptance: Option<Arc<dyn AcceptanceEvaluator>>,
125    decider: Option<Arc<dyn PermissionDecider>>,
126    workspace_manager: Option<Arc<WorkspaceManager>>,
127    event_sink: Option<Arc<dyn DriverEventSink>>,
128    replay_policy: Option<Arc<dyn ReplayPolicy>>,
129    compact_policy: Option<Arc<dyn CompactPolicy>>,
130    compact_context_window: u64,
131    auto_config: Option<AutoCompactionConfig>,
132    compact_store: Option<Arc<dyn CompactSummaryStore>>,
133    extract_memories: Option<Arc<ExtractMemories>>,
134    memory_dir: Option<PathBuf>,
135    /// Post-turn autoDream hook.
136    auto_dream: Option<Arc<dyn AutoDreamHook>>,
137    progress_every_turns: u32,
138    bin_path: Option<PathBuf>,
139    socket_path: Option<PathBuf>,
140    cancel_root: Option<CancellationToken>,
141}
142
143impl DriverOrchestratorBuilder {
144    pub fn claude_config(mut self, cfg: nexo_driver_claude::ClaudeConfig) -> Self {
145        self.claude_cfg = Some(cfg);
146        self
147    }
148    pub fn binding_store(mut self, s: Arc<dyn SessionBindingStore>) -> Self {
149        self.binding_store = Some(s);
150        self
151    }
152    pub fn acceptance(mut self, a: Arc<dyn AcceptanceEvaluator>) -> Self {
153        self.acceptance = Some(a);
154        self
155    }
156    pub fn decider(mut self, d: Arc<dyn PermissionDecider>) -> Self {
157        self.decider = Some(d);
158        self
159    }
160    pub fn workspace_manager(mut self, w: Arc<WorkspaceManager>) -> Self {
161        self.workspace_manager = Some(w);
162        self
163    }
164    pub fn event_sink(mut self, e: Arc<dyn DriverEventSink>) -> Self {
165        self.event_sink = Some(e);
166        self
167    }
168    pub fn bin_path(mut self, p: impl Into<PathBuf>) -> Self {
169        self.bin_path = Some(p.into());
170        self
171    }
172    pub fn socket_path(mut self, p: impl Into<PathBuf>) -> Self {
173        self.socket_path = Some(p.into());
174        self
175    }
176    pub fn cancel_root(mut self, c: CancellationToken) -> Self {
177        self.cancel_root = Some(c);
178        self
179    }
180    pub fn replay_policy(mut self, p: Arc<dyn ReplayPolicy>) -> Self {
181        self.replay_policy = Some(p);
182        self
183    }
184    pub fn compact_policy(mut self, p: Arc<dyn CompactPolicy>) -> Self {
185        self.compact_policy = Some(p);
186        self
187    }
188    pub fn compact_context_window(mut self, n: u64) -> Self {
189        self.compact_context_window = n;
190        self
191    }
192    pub fn auto_config(mut self, a: AutoCompactionConfig) -> Self {
193        self.auto_config = Some(a);
194        self
195    }
196    pub fn compact_store(mut self, s: Arc<dyn CompactSummaryStore>) -> Self {
197        self.compact_store = Some(s);
198        self
199    }
200    pub fn extract_memories(mut self, e: Arc<ExtractMemories>) -> Self {
201        self.extract_memories = Some(e);
202        self
203    }
204    pub fn memory_dir(mut self, p: impl Into<PathBuf>) -> Self {
205        self.memory_dir = Some(p.into());
206        self
207    }
208    /// Wire the autoDream post-turn hook. `None` disables. Mirrors
209    /// the `extract_memories` builder.
210    pub fn auto_dream(mut self, hook: Arc<dyn AutoDreamHook>) -> Self {
211        self.auto_dream = Some(hook);
212        self
213    }
214    pub fn progress_every_turns(mut self, n: u32) -> Self {
215        self.progress_every_turns = n;
216        self
217    }
218
219    pub async fn build(self) -> Result<DriverOrchestrator, DriverError> {
220        let claude_cfg = self
221            .claude_cfg
222            .ok_or_else(|| DriverError::Config("claude config required".into()))?;
223        let binding_store = self
224            .binding_store
225            .ok_or_else(|| DriverError::Config("binding_store required".into()))?;
226        let decider = self
227            .decider
228            .ok_or_else(|| DriverError::Config("decider required".into()))?;
229        let workspace_manager = self
230            .workspace_manager
231            .ok_or_else(|| DriverError::Config("workspace_manager required".into()))?;
232        let bin_path = self
233            .bin_path
234            .ok_or_else(|| DriverError::Config("bin_path required".into()))?;
235        let socket_path = self
236            .socket_path
237            .ok_or_else(|| DriverError::Config("socket_path required".into()))?;
238        let acceptance: Arc<dyn AcceptanceEvaluator> = self
239            .acceptance
240            .unwrap_or_else(|| Arc::new(NoopAcceptanceEvaluator));
241        let event_sink: Arc<dyn DriverEventSink> =
242            self.event_sink.unwrap_or_else(|| Arc::new(NoopEventSink));
243        let replay_policy: Arc<dyn ReplayPolicy> = self
244            .replay_policy
245            .unwrap_or_else(|| Arc::new(DefaultReplayPolicy::default()));
246        let compact_policy: Arc<dyn CompactPolicy> = self
247            .compact_policy
248            .unwrap_or_else(|| Arc::new(DefaultCompactPolicy::default()));
249        let compact_context_window = self.compact_context_window;
250        let auto_config = self.auto_config;
251        let compact_store: Arc<dyn CompactSummaryStore> = self
252            .compact_store
253            .unwrap_or_else(|| Arc::new(NoopCompactSummaryStore));
254        let progress_every_turns = self.progress_every_turns;
255        let cancel_root = self.cancel_root.unwrap_or_default();
256
257        // Bind the socket server. Unix-only — Windows builds
258        // get a no-op handle so the orchestrator's shutdown
259        // path (`socket_cancel.cancel()` + `_socket_handle.await`)
260        // works without changes. The permission-prompt
261        // forwarder feature is unavailable on Windows until
262        // we add a named-pipe / TCP-loopback alternative.
263        let socket_cancel = TokioCancel::new();
264        #[cfg(unix)]
265        let socket_handle = {
266            let _ = &socket_path; // suppress unused warning on cfg branch
267            let server =
268                DriverSocketServer::bind(&socket_path, decider, socket_cancel.clone()).await?;
269            tokio::spawn(server.run())
270        };
271        #[cfg(not(unix))]
272        let socket_handle = {
273            let _ = (&socket_path, decider);
274            tokio::spawn(async { Ok::<(), DriverError>(()) })
275        };
276
277        Ok(DriverOrchestrator {
278            claude_cfg,
279            binding_store,
280            acceptance,
281            workspace_manager,
282            event_sink,
283            replay_policy,
284            compact_policy,
285            compact_context_window,
286            auto_config,
287            compact_breaker: Mutex::new(AutoCompactBreaker::default()),
288            compact_store,
289            extract_memories: self.extract_memories,
290            memory_dir: self.memory_dir,
291            auto_dream: {
292                // Preserve the `builder.auto_dream(hook).build()`
293                // shape by registering the single hook under the
294                // sentinel `"_default"` key. New code calls
295                // `register_auto_dream(agent_id, hook)` directly.
296                let mut m: std::collections::HashMap<String, Arc<dyn AutoDreamHook>> =
297                    std::collections::HashMap::new();
298                if let Some(hook) = self.auto_dream {
299                    m.insert("_default".to_string(), hook);
300                }
301                std::sync::Mutex::new(m)
302            },
303            progress_every_turns,
304            pause_signals: Arc::new(DashMap::new()),
305            cancel_tokens: Arc::new(DashMap::new()),
306            budget_overrides: Arc::new(DashMap::new()),
307            pending_interrupts: Arc::new(DashMap::new()),
308            bin_path,
309            socket_path,
310            _socket_handle: socket_handle,
311            socket_cancel,
312            cancel_root,
313        })
314    }
315}
316
317impl DriverOrchestrator {
318    pub fn builder() -> DriverOrchestratorBuilder {
319        DriverOrchestratorBuilder::default()
320    }
321
322    /// Register a runner for `agent_id`.
323    /// Returns the previous runner under that key (if any) so
324    /// callers can take ownership of the displaced hook for
325    /// cleanup. Atomic + thread-safe: subsequent `run_turn` calls
326    /// for goals with that agent_id observe the new value on
327    /// their next mutex acquire.
328    pub fn register_auto_dream(
329        &self,
330        agent_id: String,
331        hook: Arc<dyn AutoDreamHook>,
332    ) -> Option<Arc<dyn AutoDreamHook>> {
333        let mut guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
334        guard.insert(agent_id, hook)
335    }
336
337    /// Atomically remove the runner for `agent_id`. Returns the
338    /// removed hook if one was registered.
339    pub fn unregister_auto_dream(&self, agent_id: &str) -> Option<Arc<dyn AutoDreamHook>> {
340        let mut guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
341        guard.remove(agent_id)
342    }
343
344    /// Sorted list of agent ids that currently have a runner
345    /// registered. Used by tests +
346    /// admin-ui observability; the sort makes assertions stable.
347    pub fn auto_dream_agents(&self) -> Vec<String> {
348        let guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
349        let mut ids: Vec<String> = guard.keys().cloned().collect();
350        ids.sort();
351        ids
352    }
353
354    /// `true` when at least one auto_dream runner is registered.
355    pub fn has_auto_dream(&self) -> bool {
356        !self
357            .auto_dream
358            .lock()
359            .unwrap_or_else(|p| p.into_inner())
360            .is_empty()
361    }
362
363    /// Compat shim. New code should call
364    /// [`register_auto_dream(agent_id, hook)`](Self::register_auto_dream)
365    /// for multi-runner routing. `Some(hook)` registers under the
366    /// sentinel `"_default"` key; `None` clears every registered
367    /// runner. Emits a deprecation warn once per process.
368    #[deprecated(
369        since = "0.1.2",
370        note = "use register_auto_dream(agent_id, hook) for multi-runner routing"
371    )]
372    pub fn set_auto_dream(&self, hook: Option<Arc<dyn AutoDreamHook>>) {
373        static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
374        WARNED.get_or_init(|| {
375            tracing::warn!(
376                target: "auto_dream.deprecation",
377                "DriverOrchestrator::set_auto_dream is deprecated; use register_auto_dream(agent_id, hook) for per-agent routing"
378            );
379        });
380        let mut guard = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
381        match hook {
382            Some(h) => {
383                guard.insert("_default".to_string(), h);
384            }
385            None => guard.clear(),
386        }
387    }
388
389    /// Request the goal's loop to hold before its next turn.
390    /// Idempotent. No-op when the goal isn't running.
391    pub fn pause_goal(&self, goal_id: GoalId) -> bool {
392        if let Some(tx) = self.pause_signals.get(&goal_id) {
393            // `send` returns Err when no receivers exist
394            // (e.g. between pre_register_goal and run_goal start).
395            // `send_replace` updates the value unconditionally so
396            // the loop sees `true` whenever it does subscribe.
397            tx.send_replace(true);
398            return true;
399        }
400        false
401    }
402
403    /// Release a paused goal's loop. Idempotent.
404    pub fn resume_goal(&self, goal_id: GoalId) -> bool {
405        if let Some(tx) = self.pause_signals.get(&goal_id) {
406            tx.send_replace(false);
407            return true;
408        }
409        false
410    }
411
412    /// True if the goal is currently paused. False if it's running
413    /// or unknown.
414    pub fn is_paused(&self, goal_id: GoalId) -> bool {
415        self.pause_signals
416            .get(&goal_id)
417            .map(|tx| *tx.borrow())
418            .unwrap_or(false)
419    }
420
421    /// Install or grow the budget override for a running goal.
422    /// `max_turns` only goes up (caller is expected to
423    /// guard against shrink-below-used). Returns the effective
424    /// `max_turns` after merge with the prior override (if any).
425    pub fn set_goal_max_turns(&self, goal_id: GoalId, new_max: u32) -> Option<u32> {
426        if !self.cancel_tokens.contains_key(&goal_id) {
427            return None;
428        }
429        let mut entry = self
430            .budget_overrides
431            .entry(goal_id)
432            .or_insert_with(|| BudgetGuards {
433                max_turns: new_max,
434                max_wall_time: Duration::from_secs(60 * 60 * 24 * 365),
435                max_tokens: u64::MAX,
436                max_consecutive_denies: u32::MAX,
437                max_consecutive_errors: u32::MAX,
438                max_consecutive_413: 2,
439            });
440        if new_max > entry.value().max_turns {
441            entry.value_mut().max_turns = new_max;
442        }
443        Some(entry.value().max_turns)
444    }
445
446    /// Operator-interrupt — push a free-form message that the
447    /// next turn's prompt will see prepended as a "[OPERATOR
448    /// INTERRUPT]" block. Use this when you want to redirect
449    /// Claude mid-run without cancelling the goal. Multiple
450    /// queued interrupts are concatenated in FIFO order.
451    /// Returns the queue depth after the push.
452    pub fn interrupt_goal(&self, goal_id: GoalId, message: impl Into<String>) -> usize {
453        let mut entry = self.pending_interrupts.entry(goal_id).or_default();
454        entry.value_mut().push_back(message.into());
455        entry.value().len()
456    }
457
458    /// Drain queued operator interrupts. The run loop calls this
459    /// at the top of every iteration so the next `AttemptParams`
460    /// extras carry an `operator_messages` array Claude sees in
461    /// its prompt.
462    pub(crate) fn drain_interrupts(&self, goal_id: GoalId) -> Vec<String> {
463        self.pending_interrupts
464            .get_mut(&goal_id)
465            .map(|mut e| e.value_mut().drain(..).collect::<Vec<_>>())
466            .unwrap_or_default()
467    }
468
469    /// Pre-register the per-goal cancel + pause tokens for a goal
470    /// that's being reattached after daemon restart but whose
471    /// `run_goal` hasn't started yet. Lets `cancel_agent` /
472    /// `pause_agent` target the goal in the gap between reattach
473    /// and the actual respawn. The tokens are removed when the
474    /// real `run_goal` exits, same as for fresh dispatches.
475    /// Idempotent — re-registering returns the existing tokens.
476    pub fn pre_register_goal(&self, goal_id: GoalId) {
477        self.pause_signals.entry(goal_id).or_insert_with(|| {
478            let (tx, _rx) = watch::channel(false);
479            tx
480        });
481        self.cancel_tokens
482            .entry(goal_id)
483            .or_insert_with(|| self.cancel_root.child_token());
484    }
485
486    /// Cancel a single in-flight goal. Idempotent.
487    /// Returns `true` when a goal was found and signalled. The
488    /// underlying `run_goal` loop will exit at the next safe point
489    /// (between turns, or when the active turn's
490    /// `CancellationToken` propagates into the Claude subprocess).
491    pub fn cancel_goal(&self, goal_id: GoalId) -> bool {
492        if let Some(tok) = self.cancel_tokens.get(&goal_id) {
493            tok.cancel();
494            return true;
495        }
496        false
497    }
498
499    /// True iff the per-goal token has been cancelled.
500    pub fn is_cancelled(&self, goal_id: GoalId) -> bool {
501        self.cancel_tokens
502            .get(&goal_id)
503            .map(|t| t.is_cancelled())
504            .unwrap_or(false)
505    }
506
507    /// Fire-and-forget spawn. Returns the
508    /// [`tokio::task::JoinHandle`] so the caller (typically the
509    /// `program_phase` tool) can register the goal in the agent
510    /// registry without waiting for completion. The orchestrator is
511    /// reference-counted (`Arc<Self>`); long-running goals don't
512    /// pin a single owner.
513    pub fn spawn_goal(
514        self: Arc<Self>,
515        goal: Goal,
516    ) -> tokio::task::JoinHandle<Result<GoalOutcome, DriverError>> {
517        tokio::spawn(async move { self.run_goal(goal).await })
518    }
519
520    /// Drive a single goal to completion. Long-running.
521    pub async fn run_goal(&self, goal: Goal) -> Result<GoalOutcome, DriverError> {
522        let started = Instant::now();
523        let goal_id = goal.id;
524
525        // Register pause signal. If pre_register_goal already
526        // populated the entry (reattach
527        // path), reuse the existing sender so any in-flight
528        // pause request is honoured by the new loop.
529        let mut pause_rx = match self.pause_signals.get(&goal_id) {
530            Some(existing) => existing.value().subscribe(),
531            None => {
532                let (tx, rx) = watch::channel(false);
533                self.pause_signals.insert(goal_id, tx);
534                rx
535            }
536        };
537        // Same reuse rule for the cancel token: a pre-registered
538        // token has already been handed
539        // out to anyone holding `cancel_goal`; clobbering it here
540        // would silently drop the cancellation signal.
541        let goal_cancel = match self.cancel_tokens.get(&goal_id) {
542            Some(t) => t.value().clone(),
543            None => {
544                let t = self.cancel_root.child_token();
545                self.cancel_tokens.insert(goal_id, t.clone());
546                t
547            }
548        };
549
550        let _ = self
551            .event_sink
552            .publish(DriverEvent::GoalStarted { goal: goal.clone() })
553            .await;
554
555        // 1. Workspace + mcp config.
556        let workspace = self.workspace_manager.ensure(&goal).await?;
557        let mcp_config_path = write_mcp_config(&workspace, &self.bin_path, &self.socket_path)?;
558
559        // 2. Loop turns.
560        let mut usage = BudgetUsage::default();
561        let mut prior_failures: Vec<nexo_driver_types::AcceptanceFailure> = Vec::new();
562        let mut last_acceptance: Option<AcceptanceVerdict> = None;
563        let mut final_text: Option<String> = None;
564        let mut total_turns: u32 = 0;
565        // Compact-policy state.
566        // Load prior compact summary for session resume.
567        let mut next_extras: Option<serde_json::Map<String, serde_json::Value>> = match self
568            .compact_store
569            .load(&goal.id.0.to_string(), &goal_id)
570            .await
571        {
572            Ok(Some(prior)) => {
573                let mut e = serde_json::Map::new();
574                e.insert(
575                    "compact_summary".into(),
576                    serde_json::Value::String(prior.summary),
577                );
578                Some(e)
579            }
580            _ => None,
581        };
582        let mut last_was_compact = false;
583        // Token count before the compact turn (captured at
584        // schedule time, consumed in the compact-turn handler for
585        // CompactSummary persistence).
586        let mut compact_before_tokens: u64 = 0;
587        let final_outcome: AttemptOutcome;
588
589        loop {
590            // Honour pause requests before advancing
591            // to the next turn. We hold here in a cancellation-aware
592            // wait; cancel still wins over a stuck pause.
593            while *pause_rx.borrow() {
594                if goal_cancel.is_cancelled() {
595                    break;
596                }
597                tokio::select! {
598                    _ = pause_rx.changed() => {}
599                    _ = goal_cancel.cancelled() => break,
600                }
601            }
602
603            // Operator override (set_goal_max_turns) lifts the
604            // turn cap for in-flight goals. Other axes still come
605            // from the original goal.budget.
606            let effective_budget = match self.budget_overrides.get(&goal_id) {
607                Some(o) => BudgetGuards {
608                    max_turns: o.value().max_turns.max(goal.budget.max_turns),
609                    ..goal.budget.clone()
610                },
611                None => goal.budget.clone(),
612            };
613            if let Some(axis) = effective_budget.is_exhausted(&usage) {
614                let _ = self
615                    .event_sink
616                    .publish(DriverEvent::BudgetExhausted {
617                        goal_id,
618                        axis,
619                        usage: usage.clone(),
620                    })
621                    .await;
622                final_outcome = AttemptOutcome::BudgetExhausted { axis };
623                break;
624            }
625            if goal_cancel.is_cancelled() {
626                final_outcome = AttemptOutcome::Cancelled;
627                break;
628            }
629
630            let _ = self
631                .event_sink
632                .publish(DriverEvent::AttemptStarted {
633                    goal_id,
634                    turn_index: total_turns,
635                    usage: usage.clone(),
636                })
637                .await;
638
639            let cancel = goal_cancel.clone();
640            let mut extras = next_extras.take().unwrap_or_else(|| {
641                build_attempt_extras(&prior_failures, &goal.budget, total_turns)
642            });
643            // Operator-interrupt drain — the agent-side
644            // `interrupt_agent` tool queues messages here; we
645            // surface them to the turn under
646            // `operator_messages` so attempt.rs can prepend them
647            // to the Claude prompt as an `[OPERATOR INTERRUPT]`
648            // block.
649            let pending = self.drain_interrupts(goal_id);
650            if !pending.is_empty() {
651                extras.insert(
652                    "operator_messages".into(),
653                    serde_json::Value::Array(
654                        pending.into_iter().map(serde_json::Value::String).collect(),
655                    ),
656                );
657            }
658            let params = AttemptParams {
659                goal: goal.clone(),
660                turn_index: total_turns,
661                usage: usage.clone(),
662                prior_decisions: Vec::new(),
663                cancel,
664                extras,
665            };
666
667            // Checkpoint pre-attempt. Sentinel `<no-git>`
668            // short-circuits diff_stat below.
669            let cp_label = format!("turn-{total_turns}-pre");
670            let cp_sha = self
671                .workspace_manager
672                .checkpoint(&workspace, &cp_label)
673                .await
674                .unwrap_or_else(|e| {
675                    tracing::warn!(target: "driver-loop", "checkpoint failed: {e}");
676                    crate::workspace::WorkspaceManager::NO_GIT_SENTINEL.to_string()
677                });
678
679            let ctx = AttemptContext {
680                claude_cfg: &self.claude_cfg,
681                binding_store: &self.binding_store,
682                acceptance: &self.acceptance,
683                workspace: &workspace,
684                mcp_config_path: &mcp_config_path,
685                bin_path: &self.bin_path,
686                cancel: goal_cancel.clone(),
687            };
688            tracing::info!(
689                target: "driver-loop",
690                goal_id = ?goal_id,
691                turn_index = total_turns,
692                "phase78: spawning attempt",
693            );
694            let mut result = run_attempt(ctx, params).await?;
695            tracing::info!(
696                target: "driver-loop",
697                goal_id = ?goal_id,
698                turn_index = total_turns,
699                outcome = ?result.outcome,
700                "phase78: attempt returned",
701            );
702
703            // Best-effort diff_stat injection.
704            if cp_sha != crate::workspace::WorkspaceManager::NO_GIT_SENTINEL {
705                if let Ok(diff) = self.workspace_manager.diff_stat(&workspace, &cp_sha).await {
706                    if !diff.trim().is_empty() {
707                        result
708                            .harness_extras
709                            .insert("worktree.diff_stat".into(), serde_json::Value::String(diff));
710                        result.harness_extras.insert(
711                            "worktree.checkpoint_sha".into(),
712                            serde_json::Value::String(cp_sha.clone()),
713                        );
714                    }
715                }
716            }
717
718            usage = result.usage_after.clone();
719
720            // Compact turns are meta: absorb tokens,
721            // do not bump turn counter, do not process outcome as work.
722            if last_was_compact {
723                last_was_compact = false;
724                // Circuit breaker: record failure if compact turn errored.
725                let compact_ok = matches!(
726                    result.outcome,
727                    AttemptOutcome::Done | AttemptOutcome::NeedsRetry { .. }
728                );
729                if compact_ok {
730                    self.compact_breaker
731                        .lock()
732                        .unwrap()
733                        .record_success(total_turns);
734                } else {
735                    self.compact_breaker.lock().unwrap().record_failure();
736                }
737                let _ = self
738                    .event_sink
739                    .publish(DriverEvent::AttemptCompleted {
740                        result: result.clone(),
741                    })
742                    .await;
743                let _ = self
744                    .event_sink
745                    .publish(DriverEvent::CompactCompleted {
746                        goal_id,
747                        turn_index: total_turns,
748                        after_tokens: usage.tokens,
749                    })
750                    .await;
751                // Persist compact summary for session resume.
752                if compact_ok {
753                    if let Some(ref summary_text) = result.final_text {
754                        let summary = CompactSummary {
755                            agent_id: goal.id.0.to_string(),
756                            summary: summary_text.clone(),
757                            turn_index: total_turns,
758                            before_tokens: compact_before_tokens,
759                            after_tokens: usage.tokens,
760                            stored_at: chrono::Utc::now(),
761                            cache_pin_keys: Vec::new(),
762                            truncated_tool_results: Vec::new(),
763                        };
764                        let _ = self.compact_store.store(summary).await;
765                    }
766                    let _ = self
767                        .event_sink
768                        .publish(DriverEvent::CompactSummaryStored {
769                            goal_id,
770                            turn_index: total_turns,
771                            before_tokens: compact_before_tokens,
772                            after_tokens: usage.tokens,
773                        })
774                        .await;
775                    // Post-compact cleanup + extraction tick.
776                    let mut cleanup = PostCompactCleanup::new();
777                    if let (Some(ref extract), Some(ref memory_dir)) =
778                        (&self.extract_memories, &self.memory_dir)
779                    {
780                        cleanup =
781                            cleanup.with_extract_memories(Arc::clone(extract), memory_dir.clone());
782                    }
783                    cleanup.run().await;
784                }
785                continue;
786            }
787
788            final_text = result.final_text.clone();
789            last_acceptance = result.acceptance.clone();
790            total_turns += 1;
791            usage.turns = total_turns;
792
793            let _ = self
794                .event_sink
795                .publish(DriverEvent::AttemptCompleted {
796                    result: result.clone(),
797                })
798                .await;
799
800            // Periodic progress beacon. `0` disables.
801            if self.progress_every_turns > 0
802                && total_turns > 0
803                && total_turns % self.progress_every_turns == 0
804            {
805                let _ = self
806                    .event_sink
807                    .publish(DriverEvent::Progress {
808                        goal_id,
809                        turn_index: total_turns,
810                        usage: usage.clone(),
811                        last_text: result.final_text.clone(),
812                    })
813                    .await;
814            }
815
816            // Post-turn memory extraction.
817            if let Some(ref extract) = self.extract_memories {
818                extract.tick();
819                match extract.check_gates() {
820                    Ok(()) => {
821                        // Gates passed — spawn extraction.
822                        if let (Some(ref memory_dir), Some(ref final_text)) =
823                            (&self.memory_dir, &result.final_text)
824                        {
825                            let messages_text = final_text.clone();
826                            let dir = memory_dir.clone();
827                            extract.extract(goal_id, total_turns, messages_text, dir);
828                        }
829                    }
830                    Err(skip_reason) => {
831                        let _ = self
832                            .event_sink
833                            .publish(DriverEvent::ExtractMemoriesSkipped {
834                                goal_id,
835                                reason: skip_reason,
836                            })
837                            .await;
838                    }
839                }
840            }
841
842            // Post-turn autoDream consolidation, invoked as a stop hook.
843            // Per-turn cost when enabled: one stat (lock mtime) — gates
844            // fail cheap when cadence not yet due. Errors absorbed via
845            // `let _ = ...` so a runner failure NEVER breaks the
846            // driver-loop turn.
847            //
848            // Multi-runner dispatch: resolve agent_id from goal.metadata
849            // (canonical convention,
850            // see `Goal::with_agent_id`) and look up the runner
851            // for that agent in the orchestrator's HashMap. Empty
852            // agent_id with a non-empty registry → warn (operator
853            // forgot the convention); unknown agent_id → debug
854            // (multi-tenant deployments legitimately have stale
855            // metadata after agent removal).
856            let owning_agent_id = goal.agent_id().unwrap_or("").to_string();
857            let ad_opt: Option<Arc<dyn AutoDreamHook>> = {
858                let map = self.auto_dream.lock().unwrap_or_else(|p| p.into_inner());
859                if owning_agent_id.is_empty() {
860                    if !map.is_empty() {
861                        tracing::warn!(
862                            target: "auto_dream.dispatch",
863                            goal_id = %goal_id.0,
864                            "auto_dream skipped: goal.metadata.agent_id is empty (use Goal::with_agent_id)"
865                        );
866                    }
867                    None
868                } else {
869                    let hook = map.get(&owning_agent_id).cloned();
870                    if hook.is_none() && !map.is_empty() {
871                        tracing::debug!(
872                            target: "auto_dream.dispatch",
873                            goal_id = %goal_id.0,
874                            agent_id = %owning_agent_id,
875                            "auto_dream skipped: no runner registered for this agent"
876                        );
877                    }
878                    hook
879                }
880            };
881            if let Some(ad) = ad_opt {
882                let transcript_dir = self
883                    .workspace_manager
884                    .root()
885                    .join(".transcripts")
886                    .join(goal_id.0.to_string());
887                let dream_ctx = DreamContextLite {
888                    agent_id: owning_agent_id.clone(),
889                    goal_id,
890                    session_id: goal_id.0.to_string(),
891                    transcript_dir,
892                    kairos_active: false,
893                    remote_mode: false,
894                };
895                let outcome_kind: AutoDreamOutcomeKind = ad.check_and_run(&dream_ctx).await;
896                let _ = self
897                    .event_sink
898                    .publish(DriverEvent::AutoDreamOutcome {
899                        goal_id,
900                        outcome_kind,
901                    })
902                    .await;
903            }
904
905            // Opportunistic /compact: if pressure
906            // crosses threshold or session age expires, schedule next
907            // iteration as a compact turn.
908            let session_age_minutes = started.elapsed().as_secs() / 60;
909            let max_failures = self
910                .auto_config
911                .as_ref()
912                .map(|a| a.max_consecutive_failures)
913                .unwrap_or(3);
914            let should_check = {
915                let breaker = self.compact_breaker.lock().unwrap();
916                !breaker.is_tripped(max_failures)
917            };
918            if should_check {
919                let last_compact = self.compact_breaker.lock().unwrap().last_compact_turn;
920                let compact_ctx = CompactContext {
921                    goal_id,
922                    turn_index: total_turns,
923                    usage: &usage,
924                    context_window: self.compact_context_window,
925                    last_compact_turn: last_compact,
926                    goal_description: &goal.description,
927                    session_age_minutes,
928                    auto_config: self.auto_config.as_ref(),
929                };
930                if let Some((focus, trigger)) = self.compact_policy.classify(&compact_ctx).await {
931                    let before_tokens = usage.tokens;
932                    compact_before_tokens = before_tokens;
933                    let age_minutes = session_age_minutes;
934                    let pressure = if self.compact_context_window > 0 {
935                        usage.tokens as f64 / self.compact_context_window as f64
936                    } else {
937                        0.0
938                    };
939                    let _ = self
940                        .event_sink
941                        .publish(DriverEvent::CompactRequested {
942                            goal_id,
943                            turn_index: total_turns,
944                            focus: focus.clone(),
945                            token_pressure: pressure,
946                            before_tokens,
947                            age_minutes,
948                            trigger,
949                        })
950                        .await;
951                    let mut e = serde_json::Map::new();
952                    e.insert("compact_turn".into(), serde_json::Value::Bool(true));
953                    e.insert("compact_focus".into(), serde_json::Value::String(focus));
954                    next_extras = Some(e);
955                    last_was_compact = true;
956                }
957            }
958            if let Some(v) = &result.acceptance {
959                let _ = self
960                    .event_sink
961                    .publish(DriverEvent::Acceptance {
962                        goal_id,
963                        verdict: v.clone(),
964                    })
965                    .await;
966            }
967
968            match &result.outcome {
969                AttemptOutcome::Done => {
970                    usage.consecutive_errors = 0;
971                    final_outcome = AttemptOutcome::Done;
972                    break;
973                }
974                AttemptOutcome::NeedsRetry { failures } => {
975                    usage.consecutive_errors = 0;
976                    prior_failures = failures.clone();
977                    continue;
978                }
979                AttemptOutcome::Sleep {
980                    duration_ms,
981                    reason,
982                } => {
983                    usage.consecutive_errors = 0;
984                    prior_failures.clear();
985                    let wake = ScheduledWake {
986                        duration_ms: *duration_ms,
987                        reason: reason.clone(),
988                        sleep_started_at: Instant::now(),
989                    };
990                    match wait_for_wake(&wake, &goal_cancel).await {
991                        WakeResult::Cancelled => {
992                            final_outcome = AttemptOutcome::Cancelled;
993                            break;
994                        }
995                        WakeResult::Fired { elapsed_ms } => {
996                            let mut extras =
997                                build_attempt_extras(&prior_failures, &goal.budget, total_turns);
998                            extras.insert(
999                                "synthetic_tick_prompt".into(),
1000                                serde_json::Value::String(build_tick_prompt(&wake, elapsed_ms)),
1001                            );
1002                            next_extras = Some(extras);
1003                            continue;
1004                        }
1005                    }
1006                }
1007                AttemptOutcome::Continue { reason } | AttemptOutcome::Escalate { reason } => {
1008                    // Replay-policy classifies the error and decides
1009                    // whether to retry the same
1010                    // turn with a fresh session, advance to the
1011                    // next turn, or escalate.
1012                    let hint = match &result.outcome {
1013                        AttemptOutcome::Continue { .. } => ReplayOutcomeHint::Continue,
1014                        _ => ReplayOutcomeHint::Escalate,
1015                    };
1016                    let cp_for_replay =
1017                        if cp_sha == crate::workspace::WorkspaceManager::NO_GIT_SENTINEL {
1018                            None
1019                        } else {
1020                            Some(cp_sha.as_str())
1021                        };
1022                    let ctx = ReplayContext {
1023                        goal_id,
1024                        turn_index: total_turns,
1025                        pre_turn_checkpoint: cp_for_replay,
1026                        usage: &usage,
1027                        error_message: reason,
1028                        last_outcome_hint: hint,
1029                    };
1030                    let decision = self.replay_policy.classify(&ctx).await;
1031                    tracing::info!(
1032                        target: "driver-loop",
1033                        goal_id = ?goal_id,
1034                        turn_index = total_turns,
1035                        reason = %reason,
1036                        decision = ?decision,
1037                        "phase78: replay decision",
1038                    );
1039                    let _ = self
1040                        .event_sink
1041                        .publish(DriverEvent::ReplayDecision {
1042                            goal_id,
1043                            turn_index: total_turns,
1044                            decision: decision.clone(),
1045                            error_message: reason.clone(),
1046                        })
1047                        .await;
1048                    match decision {
1049                        ReplayDecision::FreshSessionRetry { rollback_to } => {
1050                            let _ = self.binding_store.mark_invalid(goal_id).await;
1051                            if let Some(sha) = rollback_to {
1052                                let _ = self.workspace_manager.rollback(&workspace, &sha).await;
1053                            }
1054                            usage.consecutive_errors = usage.consecutive_errors.saturating_add(1);
1055                            // NO bump turn_index — same logical turn retries.
1056                            // Undo the +1 bump that happened above.
1057                            total_turns = total_turns.saturating_sub(1);
1058                            usage.turns = total_turns;
1059                            prior_failures.clear();
1060                            tracing::info!(
1061                                target: "driver-loop",
1062                                goal_id = ?goal_id,
1063                                "phase78: FreshSessionRetry — looping",
1064                            );
1065                            continue;
1066                        }
1067                        ReplayDecision::NextTurn { rollback_to } => {
1068                            if let Some(sha) = rollback_to {
1069                                let _ = self.workspace_manager.rollback(&workspace, &sha).await;
1070                            }
1071                            prior_failures.clear();
1072                            tracing::info!(
1073                                target: "driver-loop",
1074                                goal_id = ?goal_id,
1075                                next_turn = total_turns,
1076                                "phase78: NextTurn — looping",
1077                            );
1078                            continue;
1079                        }
1080                        ReplayDecision::CompactAndRetry => {
1081                            // Reactive 413 recovery: bump
1082                            // `consecutive_413`, undo the turn bump,
1083                            // let the proactive compact path fire on
1084                            // the retry loop.
1085                            usage.consecutive_413 = usage.consecutive_413.saturating_add(1);
1086                            total_turns = total_turns.saturating_sub(1);
1087                            usage.turns = total_turns;
1088                            prior_failures.clear();
1089                            tracing::info!(
1090                                target: "driver-loop",
1091                                goal_id = ?goal_id,
1092                                consecutive_413 = usage.consecutive_413,
1093                                "phase85.1: CompactAndRetry — looping",
1094                            );
1095                            continue;
1096                        }
1097                        ReplayDecision::Escalate { reason } => {
1098                            let _ = self
1099                                .event_sink
1100                                .publish(DriverEvent::Escalate {
1101                                    goal_id,
1102                                    reason: reason.clone(),
1103                                })
1104                                .await;
1105                            final_outcome = AttemptOutcome::Escalate { reason };
1106                            break;
1107                        }
1108                    }
1109                }
1110                AttemptOutcome::Cancelled => {
1111                    final_outcome = AttemptOutcome::Cancelled;
1112                    break;
1113                }
1114                AttemptOutcome::BudgetExhausted { axis } => {
1115                    let _ = self
1116                        .event_sink
1117                        .publish(DriverEvent::BudgetExhausted {
1118                            goal_id,
1119                            axis: *axis,
1120                            usage: usage.clone(),
1121                        })
1122                        .await;
1123                    final_outcome = AttemptOutcome::BudgetExhausted { axis: *axis };
1124                    break;
1125                }
1126            }
1127        }
1128
1129        let outcome = GoalOutcome {
1130            goal_id,
1131            outcome: final_outcome,
1132            total_turns,
1133            usage,
1134            final_text,
1135            acceptance: last_acceptance,
1136            elapsed: started.elapsed(),
1137        };
1138        let _ = self
1139            .event_sink
1140            .publish(DriverEvent::GoalCompleted {
1141                outcome: outcome.clone(),
1142            })
1143            .await;
1144        // Clean up pause signal once the loop exits.
1145        self.pause_signals.remove(&goal_id);
1146        // Drop the per-goal cancel token.
1147        self.cancel_tokens.remove(&goal_id);
1148        // Drop budget override entry so a new spawn of the
1149        // same id starts fresh.
1150        self.budget_overrides.remove(&goal_id);
1151        // Also clear any queued operator interrupts that
1152        // never made it into a turn (e.g. queued late while the
1153        // goal was already wrapping up). Without this they leak.
1154        self.pending_interrupts.remove(&goal_id);
1155        Ok(outcome)
1156    }
1157
1158    /// Cancel every in-flight goal + drain socket server.
1159    pub async fn shutdown(self) -> Result<(), DriverError> {
1160        self.cancel_root.cancel();
1161        self.socket_cancel.cancel();
1162        let _ = self._socket_handle.await;
1163        Ok(())
1164    }
1165}
1166
1167fn build_attempt_extras(
1168    prior_failures: &[nexo_driver_types::AcceptanceFailure],
1169    budget: &BudgetGuards,
1170    turn_index: u32,
1171) -> serde_json::Map<String, serde_json::Value> {
1172    let mut m = serde_json::Map::new();
1173    m.insert(
1174        "turn_index".into(),
1175        serde_json::Value::Number(turn_index.into()),
1176    );
1177    m.insert(
1178        "max_turns".into(),
1179        serde_json::Value::Number(budget.max_turns.into()),
1180    );
1181    if !prior_failures.is_empty() {
1182        m.insert(
1183            "prior_failures".into(),
1184            serde_json::to_value(prior_failures).unwrap_or(serde_json::Value::Null),
1185        );
1186    }
1187    m
1188}