1use std::path::Path;
6use std::sync::Arc;
7use std::time::Instant;
8
9use nexo_driver_claude::{
10 spawn_turn, ClaudeCommand, ClaudeError, ClaudeEvent, ResultEvent, SessionBinding,
11 SessionBindingStore,
12};
13use nexo_driver_types::{
14 AttemptOutcome, AttemptParams, AttemptResult, BudgetUsage, CancellationToken, GoalId,
15};
16
17use crate::acceptance::AcceptanceEvaluator;
18use crate::error::DriverError;
19
20const SLEEP_SENTINEL_KEY: &str = "__nexo_sleep__";
21
22pub(crate) struct AttemptContext<'a> {
24 pub claude_cfg: &'a nexo_driver_claude::ClaudeConfig,
25 pub binding_store: &'a Arc<dyn SessionBindingStore>,
26 pub acceptance: &'a Arc<dyn AcceptanceEvaluator>,
27 pub workspace: &'a Path,
28 pub mcp_config_path: &'a Path,
29 pub bin_path: &'a Path,
30 pub cancel: CancellationToken,
31}
32
33pub(crate) async fn run_attempt(
34 ctx: AttemptContext<'_>,
35 params: AttemptParams,
36) -> Result<AttemptResult, DriverError> {
37 let goal_id = params.goal.id;
38 let mut usage = params.usage.clone();
39
40 let binary = ctx
42 .claude_cfg
43 .binary
44 .clone()
45 .unwrap_or_else(|| std::path::PathBuf::from("claude"));
46 let prior = ctx.binding_store.get(goal_id).await?;
47 let prompt = if params
52 .extras
53 .get("compact_turn")
54 .and_then(|v| v.as_bool())
55 .unwrap_or(false)
56 {
57 let focus = params
58 .extras
59 .get("compact_focus")
60 .and_then(|v| v.as_str())
61 .unwrap_or("continue working");
62 format!("/compact {focus}")
63 } else {
64 let mut p = String::new();
69 if let Some(serde_json::Value::Array(msgs)) = params.extras.get("operator_messages") {
70 if !msgs.is_empty() {
71 p.push_str("[OPERATOR INTERRUPT]\n");
72 for m in msgs {
73 if let Some(s) = m.as_str() {
74 p.push_str(s);
75 p.push('\n');
76 }
77 }
78 p.push_str("[END OPERATOR INTERRUPT]\n\n");
79 }
80 }
81 if let Some(serde_json::Value::String(tick_prompt)) =
82 params.extras.get("synthetic_tick_prompt")
83 {
84 if !tick_prompt.is_empty() {
85 p.push_str(tick_prompt);
86 p.push_str("\n\n");
87 }
88 }
89 p.push_str(¶ms.goal.description);
90 p
91 };
92 let mut cmd = ClaudeCommand::new(binary, prompt)
93 .apply_defaults(&ctx.claude_cfg.default_args)
94 .cwd(ctx.workspace)
95 .mcp_config(ctx.mcp_config_path);
96 cmd = match &prior {
97 Some(b) => cmd.resume(b.session_id.clone()),
98 None => cmd, };
100 let _ = ctx.bin_path; let turn_start = Instant::now();
103 let mut turn = match spawn_turn(
104 cmd,
105 &ctx.cancel,
106 ctx.claude_cfg.turn_timeout,
107 ctx.claude_cfg.forced_kill_after,
108 )
109 .await
110 {
111 Ok(t) => t,
112 Err(ClaudeError::Cancelled) => {
113 return Ok(synthetic(
114 goal_id,
115 params.turn_index,
116 AttemptOutcome::Cancelled,
117 usage,
118 ));
119 }
120 Err(e) => {
121 return Ok(synthetic(
122 goal_id,
123 params.turn_index,
124 AttemptOutcome::Escalate {
125 reason: format!("spawn failed: {e}"),
126 },
127 usage,
128 ));
129 }
130 };
131
132 let mut last_session_id: Option<String> = prior.map(|b| b.session_id);
133 let mut final_text: Option<String> = None;
134 let mut claimed_done = false;
135 let mut session_invalid = false;
136 let mut error_message: Option<String> = None;
137
138 loop {
139 let ev = match turn.next_event().await {
140 Ok(Some(e)) => e,
141 Ok(None) => break,
142 Err(ClaudeError::Cancelled) => {
143 let _ = turn.shutdown().await;
144 return Ok(synthetic(
145 goal_id,
146 params.turn_index,
147 AttemptOutcome::Cancelled,
148 usage,
149 ));
150 }
151 Err(ClaudeError::Timeout) => {
152 let _ = turn.shutdown().await;
153 return Ok(synthetic(
154 goal_id,
155 params.turn_index,
156 AttemptOutcome::Continue {
157 reason: "turn timeout".into(),
158 },
159 usage,
160 ));
161 }
162 Err(e) => {
163 let _ = turn.shutdown().await;
164 return Ok(synthetic(
165 goal_id,
166 params.turn_index,
167 AttemptOutcome::Escalate {
168 reason: format!("stream error: {e}"),
169 },
170 usage,
171 ));
172 }
173 };
174 if let Some(sid) = ev.session_id() {
175 last_session_id = Some(sid.to_string());
176 }
177 match &ev {
178 ClaudeEvent::Result(ResultEvent::Success {
179 result, usage: tu, ..
180 }) => {
181 let total = tu.input_tokens + tu.output_tokens + tu.cache_read_input_tokens;
182 usage.tokens = usage.tokens.saturating_add(total);
183 if let Some(text) = result.as_deref() {
184 if let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
185 if let Some(outcome) = map_sleep_result(&json) {
186 return Ok(AttemptResult {
187 goal_id,
188 turn_index: params.turn_index,
189 outcome,
190 decisions_recorded: vec![],
191 usage_after: usage,
192 acceptance: None,
193 final_text: None,
194 harness_extras: harness_extras_with_session(&last_session_id),
195 });
196 }
197 }
198 }
199 final_text = result.clone();
200 claimed_done = true;
201 break;
202 }
203 ClaudeEvent::Result(ResultEvent::ErrorMaxTurns { .. }) => {
204 error_message = Some("claude reported max turns".into());
205 break;
206 }
207 ClaudeEvent::Result(ResultEvent::ErrorDuringExecution { message, .. }) => {
208 let m = message.clone().unwrap_or_default();
209 if m.to_lowercase().contains("session") {
210 session_invalid = true;
211 }
212 error_message = Some(m);
213 break;
214 }
215 _ => {}
216 }
217 }
218
219 let _ = turn.shutdown().await;
220
221 if let Some(sid) = &last_session_id {
227 let workspace_pb: std::path::PathBuf = ctx.workspace.to_path_buf();
228 let mut binding = SessionBinding::new(
229 goal_id,
230 sid.clone(),
231 ctx.claude_cfg.default_args.model.clone(),
232 Some(workspace_pb),
233 );
234 if let Some(o) = params.goal.metadata.get("origin_channel") {
235 if !o.is_null() {
236 if let Ok(parsed) =
237 serde_json::from_value::<nexo_driver_claude::OriginChannel>(o.clone())
238 {
239 binding = binding.with_origin(parsed);
240 }
241 }
242 }
243 if let Some(d) = params.goal.metadata.get("dispatcher") {
244 if !d.is_null() {
245 if let Ok(parsed) =
246 serde_json::from_value::<nexo_driver_claude::DispatcherIdentity>(d.clone())
247 {
248 binding = binding.with_dispatcher(parsed);
249 }
250 }
251 }
252 ctx.binding_store.upsert(binding).await?;
253 }
254
255 usage.wall_time = usage.wall_time.saturating_add(turn_start.elapsed());
257
258 if session_invalid {
260 ctx.binding_store.mark_invalid(goal_id).await?;
261 return Ok(AttemptResult {
262 goal_id,
263 turn_index: params.turn_index,
264 outcome: AttemptOutcome::Continue {
265 reason: "session invalid: retrying".into(),
266 },
267 decisions_recorded: vec![],
268 usage_after: usage,
269 acceptance: None,
270 final_text,
271 harness_extras: harness_extras_with_session(&last_session_id),
272 });
273 }
274
275 if let Some(msg) = error_message {
276 return Ok(AttemptResult {
277 goal_id,
278 turn_index: params.turn_index,
279 outcome: AttemptOutcome::Escalate { reason: msg },
280 decisions_recorded: vec![],
281 usage_after: usage,
282 acceptance: None,
283 final_text,
284 harness_extras: harness_extras_with_session(&last_session_id),
285 });
286 }
287
288 if !claimed_done {
289 return Ok(AttemptResult {
290 goal_id,
291 turn_index: params.turn_index,
292 outcome: AttemptOutcome::Continue {
293 reason: "stream ended without result event".into(),
294 },
295 decisions_recorded: vec![],
296 usage_after: usage,
297 acceptance: None,
298 final_text,
299 harness_extras: harness_extras_with_session(&last_session_id),
300 });
301 }
302
303 let verdict = ctx
305 .acceptance
306 .evaluate(¶ms.goal.acceptance, ctx.workspace)
307 .await?;
308
309 let outcome = if verdict.met {
310 AttemptOutcome::Done
311 } else {
312 AttemptOutcome::NeedsRetry {
313 failures: verdict.failures.clone(),
314 }
315 };
316
317 Ok(AttemptResult {
318 goal_id,
319 turn_index: params.turn_index,
320 outcome,
321 decisions_recorded: vec![],
322 usage_after: usage,
323 acceptance: Some(verdict),
324 final_text,
325 harness_extras: harness_extras_with_session(&last_session_id),
326 })
327}
328
329fn synthetic(
330 goal_id: GoalId,
331 turn_index: u32,
332 outcome: AttemptOutcome,
333 usage: BudgetUsage,
334) -> AttemptResult {
335 AttemptResult {
336 goal_id,
337 turn_index,
338 outcome,
339 decisions_recorded: vec![],
340 usage_after: usage,
341 acceptance: None,
342 final_text: None,
343 harness_extras: serde_json::Map::new(),
344 }
345}
346
347fn map_sleep_result(value: &serde_json::Value) -> Option<AttemptOutcome> {
348 if !value
349 .get(SLEEP_SENTINEL_KEY)
350 .and_then(|v| v.as_bool())
351 .unwrap_or(false)
352 {
353 return None;
354 }
355 Some(AttemptOutcome::Sleep {
356 duration_ms: value.get("duration_ms")?.as_u64()?,
357 reason: value.get("reason")?.as_str()?.to_string(),
358 })
359}
360
361fn harness_extras_with_session(sid: &Option<String>) -> serde_json::Map<String, serde_json::Value> {
362 let mut m = serde_json::Map::new();
363 if let Some(s) = sid {
364 m.insert(
365 "claude_code.session_id".into(),
366 serde_json::Value::String(s.clone()),
367 );
368 }
369 m
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn sleep_sentinel_maps_to_sleep_outcome() {
378 let value = serde_json::json!({
379 "__nexo_sleep__": true,
380 "duration_ms": 270_000,
381 "reason": "waiting"
382 });
383 let mapped = map_sleep_result(&value).unwrap();
384 assert_eq!(
385 mapped,
386 AttemptOutcome::Sleep {
387 duration_ms: 270_000,
388 reason: "waiting".into(),
389 }
390 );
391 }
392
393 #[test]
394 fn normal_tool_result_does_not_map_to_sleep() {
395 let value = serde_json::json!({"text": "hello"});
396 assert_eq!(map_sleep_result(&value), None);
397 }
398}