Skip to main content

nexo_driver_loop/
attempt.rs

1//! Per-turn loop. Owns the spawned `claude` subprocess for one
2//! turn, projects events back to the orchestrator, and synthesises
3//! an `AttemptResult` at the end.
4
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Instant;
8
9use nexo_driver_claude::{
10    spawn_turn, ClaudeCommand, ClaudeError, ClaudeEvent, ResultEvent, SessionBinding,
11    SessionBindingStore,
12};
13use nexo_driver_types::{
14    AttemptOutcome, AttemptParams, AttemptResult, BudgetUsage, CancellationToken, GoalId,
15};
16
17use crate::acceptance::AcceptanceEvaluator;
18use crate::error::DriverError;
19
20const SLEEP_SENTINEL_KEY: &str = "__nexo_sleep__";
21
22/// Bundle of refs the loop borrows for one attempt.
23pub(crate) struct AttemptContext<'a> {
24    pub claude_cfg: &'a nexo_driver_claude::ClaudeConfig,
25    pub binding_store: &'a Arc<dyn SessionBindingStore>,
26    pub acceptance: &'a Arc<dyn AcceptanceEvaluator>,
27    pub workspace: &'a Path,
28    pub mcp_config_path: &'a Path,
29    pub bin_path: &'a Path,
30    pub cancel: CancellationToken,
31}
32
33pub(crate) async fn run_attempt(
34    ctx: AttemptContext<'_>,
35    params: AttemptParams,
36) -> Result<AttemptResult, DriverError> {
37    let goal_id = params.goal.id;
38    let mut usage = params.usage.clone();
39
40    // Compose the command. Reuse binding session_id when present.
41    let binary = ctx
42        .claude_cfg
43        .binary
44        .clone()
45        .unwrap_or_else(|| std::path::PathBuf::from("claude"));
46    let prior = ctx.binding_store.get(goal_id).await?;
47    // When the orchestrator scheduled a compact turn it pre-fills
48    // `extras["compact_turn"] = true`; substitute the
49    // prompt with a `/compact <focus>` slash command so Claude Code
50    // compacts its context.
51    let prompt = if params
52        .extras
53        .get("compact_turn")
54        .and_then(|v| v.as_bool())
55        .unwrap_or(false)
56    {
57        let focus = params
58            .extras
59            .get("compact_focus")
60            .and_then(|v| v.as_str())
61            .unwrap_or("continue working");
62        format!("/compact {focus}")
63    } else {
64        // Operator-interrupt — prepend any queued messages from
65        // the agent side as a clearly-marked block so Claude
66        // sees them as a high-priority directive on top of the
67        // goal description.
68        let mut p = String::new();
69        if let Some(serde_json::Value::Array(msgs)) = params.extras.get("operator_messages") {
70            if !msgs.is_empty() {
71                p.push_str("[OPERATOR INTERRUPT]\n");
72                for m in msgs {
73                    if let Some(s) = m.as_str() {
74                        p.push_str(s);
75                        p.push('\n');
76                    }
77                }
78                p.push_str("[END OPERATOR INTERRUPT]\n\n");
79            }
80        }
81        if let Some(serde_json::Value::String(tick_prompt)) =
82            params.extras.get("synthetic_tick_prompt")
83        {
84            if !tick_prompt.is_empty() {
85                p.push_str(tick_prompt);
86                p.push_str("\n\n");
87            }
88        }
89        p.push_str(&params.goal.description);
90        p
91    };
92    let mut cmd = ClaudeCommand::new(binary, prompt)
93        .apply_defaults(&ctx.claude_cfg.default_args)
94        .cwd(ctx.workspace)
95        .mcp_config(ctx.mcp_config_path);
96    cmd = match &prior {
97        Some(b) => cmd.resume(b.session_id.clone()),
98        None => cmd, // first turn — claude assigns its own session id
99    };
100    let _ = ctx.bin_path; // bin path lives in mcp_config; kept here for future env injection.
101
102    let turn_start = Instant::now();
103    let mut turn = match spawn_turn(
104        cmd,
105        &ctx.cancel,
106        ctx.claude_cfg.turn_timeout,
107        ctx.claude_cfg.forced_kill_after,
108    )
109    .await
110    {
111        Ok(t) => t,
112        Err(ClaudeError::Cancelled) => {
113            return Ok(synthetic(
114                goal_id,
115                params.turn_index,
116                AttemptOutcome::Cancelled,
117                usage,
118            ));
119        }
120        Err(e) => {
121            return Ok(synthetic(
122                goal_id,
123                params.turn_index,
124                AttemptOutcome::Escalate {
125                    reason: format!("spawn failed: {e}"),
126                },
127                usage,
128            ));
129        }
130    };
131
132    let mut last_session_id: Option<String> = prior.map(|b| b.session_id);
133    let mut final_text: Option<String> = None;
134    let mut claimed_done = false;
135    let mut session_invalid = false;
136    let mut error_message: Option<String> = None;
137
138    loop {
139        let ev = match turn.next_event().await {
140            Ok(Some(e)) => e,
141            Ok(None) => break,
142            Err(ClaudeError::Cancelled) => {
143                let _ = turn.shutdown().await;
144                return Ok(synthetic(
145                    goal_id,
146                    params.turn_index,
147                    AttemptOutcome::Cancelled,
148                    usage,
149                ));
150            }
151            Err(ClaudeError::Timeout) => {
152                let _ = turn.shutdown().await;
153                return Ok(synthetic(
154                    goal_id,
155                    params.turn_index,
156                    AttemptOutcome::Continue {
157                        reason: "turn timeout".into(),
158                    },
159                    usage,
160                ));
161            }
162            Err(e) => {
163                let _ = turn.shutdown().await;
164                return Ok(synthetic(
165                    goal_id,
166                    params.turn_index,
167                    AttemptOutcome::Escalate {
168                        reason: format!("stream error: {e}"),
169                    },
170                    usage,
171                ));
172            }
173        };
174        if let Some(sid) = ev.session_id() {
175            last_session_id = Some(sid.to_string());
176        }
177        match &ev {
178            ClaudeEvent::Result(ResultEvent::Success {
179                result, usage: tu, ..
180            }) => {
181                let total = tu.input_tokens + tu.output_tokens + tu.cache_read_input_tokens;
182                usage.tokens = usage.tokens.saturating_add(total);
183                if let Some(text) = result.as_deref() {
184                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
185                        if let Some(outcome) = map_sleep_result(&json) {
186                            return Ok(AttemptResult {
187                                goal_id,
188                                turn_index: params.turn_index,
189                                outcome,
190                                decisions_recorded: vec![],
191                                usage_after: usage,
192                                acceptance: None,
193                                final_text: None,
194                                harness_extras: harness_extras_with_session(&last_session_id),
195                            });
196                        }
197                    }
198                }
199                final_text = result.clone();
200                claimed_done = true;
201                break;
202            }
203            ClaudeEvent::Result(ResultEvent::ErrorMaxTurns { .. }) => {
204                error_message = Some("claude reported max turns".into());
205                break;
206            }
207            ClaudeEvent::Result(ResultEvent::ErrorDuringExecution { message, .. }) => {
208                let m = message.clone().unwrap_or_default();
209                if m.to_lowercase().contains("session") {
210                    session_invalid = true;
211                }
212                error_message = Some(m);
213                break;
214            }
215            _ => {}
216        }
217    }
218
219    let _ = turn.shutdown().await;
220
221    // Persist binding (whatever session id Claude reported last).
222    // Lift origin_channel + dispatcher out of goal.metadata so
223    // the binding carries the chat that triggered the goal across
224    // reattach, and the completion router knows where to send the
225    // notify_origin summary.
226    if let Some(sid) = &last_session_id {
227        let workspace_pb: std::path::PathBuf = ctx.workspace.to_path_buf();
228        let mut binding = SessionBinding::new(
229            goal_id,
230            sid.clone(),
231            ctx.claude_cfg.default_args.model.clone(),
232            Some(workspace_pb),
233        );
234        if let Some(o) = params.goal.metadata.get("origin_channel") {
235            if !o.is_null() {
236                if let Ok(parsed) =
237                    serde_json::from_value::<nexo_driver_claude::OriginChannel>(o.clone())
238                {
239                    binding = binding.with_origin(parsed);
240                }
241            }
242        }
243        if let Some(d) = params.goal.metadata.get("dispatcher") {
244            if !d.is_null() {
245                if let Ok(parsed) =
246                    serde_json::from_value::<nexo_driver_claude::DispatcherIdentity>(d.clone())
247                {
248                    binding = binding.with_dispatcher(parsed);
249                }
250            }
251        }
252        ctx.binding_store.upsert(binding).await?;
253    }
254
255    // Wall-time portion of usage.
256    usage.wall_time = usage.wall_time.saturating_add(turn_start.elapsed());
257
258    // Session-invalid: mark, return Continue so orchestrator retries.
259    if session_invalid {
260        ctx.binding_store.mark_invalid(goal_id).await?;
261        return Ok(AttemptResult {
262            goal_id,
263            turn_index: params.turn_index,
264            outcome: AttemptOutcome::Continue {
265                reason: "session invalid: retrying".into(),
266            },
267            decisions_recorded: vec![],
268            usage_after: usage,
269            acceptance: None,
270            final_text,
271            harness_extras: harness_extras_with_session(&last_session_id),
272        });
273    }
274
275    if let Some(msg) = error_message {
276        return Ok(AttemptResult {
277            goal_id,
278            turn_index: params.turn_index,
279            outcome: AttemptOutcome::Escalate { reason: msg },
280            decisions_recorded: vec![],
281            usage_after: usage,
282            acceptance: None,
283            final_text,
284            harness_extras: harness_extras_with_session(&last_session_id),
285        });
286    }
287
288    if !claimed_done {
289        return Ok(AttemptResult {
290            goal_id,
291            turn_index: params.turn_index,
292            outcome: AttemptOutcome::Continue {
293                reason: "stream ended without result event".into(),
294            },
295            decisions_recorded: vec![],
296            usage_after: usage,
297            acceptance: None,
298            final_text,
299            harness_extras: harness_extras_with_session(&last_session_id),
300        });
301    }
302
303    // Acceptance check post-claim.
304    let verdict = ctx
305        .acceptance
306        .evaluate(&params.goal.acceptance, ctx.workspace)
307        .await?;
308
309    let outcome = if verdict.met {
310        AttemptOutcome::Done
311    } else {
312        AttemptOutcome::NeedsRetry {
313            failures: verdict.failures.clone(),
314        }
315    };
316
317    Ok(AttemptResult {
318        goal_id,
319        turn_index: params.turn_index,
320        outcome,
321        decisions_recorded: vec![],
322        usage_after: usage,
323        acceptance: Some(verdict),
324        final_text,
325        harness_extras: harness_extras_with_session(&last_session_id),
326    })
327}
328
329fn synthetic(
330    goal_id: GoalId,
331    turn_index: u32,
332    outcome: AttemptOutcome,
333    usage: BudgetUsage,
334) -> AttemptResult {
335    AttemptResult {
336        goal_id,
337        turn_index,
338        outcome,
339        decisions_recorded: vec![],
340        usage_after: usage,
341        acceptance: None,
342        final_text: None,
343        harness_extras: serde_json::Map::new(),
344    }
345}
346
347fn map_sleep_result(value: &serde_json::Value) -> Option<AttemptOutcome> {
348    if !value
349        .get(SLEEP_SENTINEL_KEY)
350        .and_then(|v| v.as_bool())
351        .unwrap_or(false)
352    {
353        return None;
354    }
355    Some(AttemptOutcome::Sleep {
356        duration_ms: value.get("duration_ms")?.as_u64()?,
357        reason: value.get("reason")?.as_str()?.to_string(),
358    })
359}
360
361fn harness_extras_with_session(sid: &Option<String>) -> serde_json::Map<String, serde_json::Value> {
362    let mut m = serde_json::Map::new();
363    if let Some(s) = sid {
364        m.insert(
365            "claude_code.session_id".into(),
366            serde_json::Value::String(s.clone()),
367        );
368    }
369    m
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn sleep_sentinel_maps_to_sleep_outcome() {
378        let value = serde_json::json!({
379            "__nexo_sleep__": true,
380            "duration_ms": 270_000,
381            "reason": "waiting"
382        });
383        let mapped = map_sleep_result(&value).unwrap();
384        assert_eq!(
385            mapped,
386            AttemptOutcome::Sleep {
387                duration_ms: 270_000,
388                reason: "waiting".into(),
389            }
390        );
391    }
392
393    #[test]
394    fn normal_tool_result_does_not_map_to_sleep() {
395        let value = serde_json::json!({"text": "hello"});
396        assert_eq!(map_sleep_result(&value), None);
397    }
398}