nexo-driver-loop 0.1.9

Goal orchestrator + LlmDecider + Unix socket bridge for the nexo-rs driver subsystem. Phase 67.4.
Documentation
//! Per-turn loop. Owns the spawned `claude` subprocess for one
//! turn, projects events back to the orchestrator, and synthesises
//! an `AttemptResult` at the end.

use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use nexo_driver_claude::{
    spawn_turn, ClaudeCommand, ClaudeError, ClaudeEvent, ResultEvent, SessionBinding,
    SessionBindingStore,
};
use nexo_driver_types::{
    AttemptOutcome, AttemptParams, AttemptResult, BudgetUsage, CancellationToken, GoalId,
};

use crate::acceptance::AcceptanceEvaluator;
use crate::error::DriverError;

const SLEEP_SENTINEL_KEY: &str = "__nexo_sleep__";

/// Bundle of refs the loop borrows for one attempt.
pub(crate) struct AttemptContext<'a> {
    pub claude_cfg: &'a nexo_driver_claude::ClaudeConfig,
    pub binding_store: &'a Arc<dyn SessionBindingStore>,
    pub acceptance: &'a Arc<dyn AcceptanceEvaluator>,
    pub workspace: &'a Path,
    pub mcp_config_path: &'a Path,
    pub bin_path: &'a Path,
    pub cancel: CancellationToken,
}

pub(crate) async fn run_attempt(
    ctx: AttemptContext<'_>,
    params: AttemptParams,
) -> Result<AttemptResult, DriverError> {
    let goal_id = params.goal.id;
    let mut usage = params.usage.clone();

    // Compose the command. Reuse binding session_id when present.
    let binary = ctx
        .claude_cfg
        .binary
        .clone()
        .unwrap_or_else(|| std::path::PathBuf::from("claude"));
    let prior = ctx.binding_store.get(goal_id).await?;
    // When the orchestrator scheduled a compact turn it pre-fills
    // `extras["compact_turn"] = true`; substitute the
    // prompt with a `/compact <focus>` slash command so Claude Code
    // compacts its context.
    let prompt = if params
        .extras
        .get("compact_turn")
        .and_then(|v| v.as_bool())
        .unwrap_or(false)
    {
        let focus = params
            .extras
            .get("compact_focus")
            .and_then(|v| v.as_str())
            .unwrap_or("continue working");
        format!("/compact {focus}")
    } else {
        // Operator-interrupt — prepend any queued messages from
        // the agent side as a clearly-marked block so Claude
        // sees them as a high-priority directive on top of the
        // goal description.
        let mut p = String::new();
        if let Some(serde_json::Value::Array(msgs)) = params.extras.get("operator_messages") {
            if !msgs.is_empty() {
                p.push_str("[OPERATOR INTERRUPT]\n");
                for m in msgs {
                    if let Some(s) = m.as_str() {
                        p.push_str(s);
                        p.push('\n');
                    }
                }
                p.push_str("[END OPERATOR INTERRUPT]\n\n");
            }
        }
        if let Some(serde_json::Value::String(tick_prompt)) =
            params.extras.get("synthetic_tick_prompt")
        {
            if !tick_prompt.is_empty() {
                p.push_str(tick_prompt);
                p.push_str("\n\n");
            }
        }
        p.push_str(&params.goal.description);
        p
    };
    let mut cmd = ClaudeCommand::new(binary, prompt)
        .apply_defaults(&ctx.claude_cfg.default_args)
        .cwd(ctx.workspace)
        .mcp_config(ctx.mcp_config_path);
    cmd = match &prior {
        Some(b) => cmd.resume(b.session_id.clone()),
        None => cmd, // first turn — claude assigns its own session id
    };
    let _ = ctx.bin_path; // bin path lives in mcp_config; kept here for future env injection.

    let turn_start = Instant::now();
    let mut turn = match spawn_turn(
        cmd,
        &ctx.cancel,
        ctx.claude_cfg.turn_timeout,
        ctx.claude_cfg.forced_kill_after,
    )
    .await
    {
        Ok(t) => t,
        Err(ClaudeError::Cancelled) => {
            return Ok(synthetic(
                goal_id,
                params.turn_index,
                AttemptOutcome::Cancelled,
                usage,
            ));
        }
        Err(e) => {
            return Ok(synthetic(
                goal_id,
                params.turn_index,
                AttemptOutcome::Escalate {
                    reason: format!("spawn failed: {e}"),
                },
                usage,
            ));
        }
    };

    let mut last_session_id: Option<String> = prior.map(|b| b.session_id);
    let mut final_text: Option<String> = None;
    let mut claimed_done = false;
    let mut session_invalid = false;
    let mut error_message: Option<String> = None;

    loop {
        let ev = match turn.next_event().await {
            Ok(Some(e)) => e,
            Ok(None) => break,
            Err(ClaudeError::Cancelled) => {
                let _ = turn.shutdown().await;
                return Ok(synthetic(
                    goal_id,
                    params.turn_index,
                    AttemptOutcome::Cancelled,
                    usage,
                ));
            }
            Err(ClaudeError::Timeout) => {
                let _ = turn.shutdown().await;
                return Ok(synthetic(
                    goal_id,
                    params.turn_index,
                    AttemptOutcome::Continue {
                        reason: "turn timeout".into(),
                    },
                    usage,
                ));
            }
            Err(e) => {
                let _ = turn.shutdown().await;
                return Ok(synthetic(
                    goal_id,
                    params.turn_index,
                    AttemptOutcome::Escalate {
                        reason: format!("stream error: {e}"),
                    },
                    usage,
                ));
            }
        };
        if let Some(sid) = ev.session_id() {
            last_session_id = Some(sid.to_string());
        }
        match &ev {
            ClaudeEvent::Result(ResultEvent::Success {
                result, usage: tu, ..
            }) => {
                let total = tu.input_tokens + tu.output_tokens + tu.cache_read_input_tokens;
                usage.tokens = usage.tokens.saturating_add(total);
                if let Some(text) = result.as_deref() {
                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
                        if let Some(outcome) = map_sleep_result(&json) {
                            return Ok(AttemptResult {
                                goal_id,
                                turn_index: params.turn_index,
                                outcome,
                                decisions_recorded: vec![],
                                usage_after: usage,
                                acceptance: None,
                                final_text: None,
                                harness_extras: harness_extras_with_session(&last_session_id),
                            });
                        }
                    }
                }
                final_text = result.clone();
                claimed_done = true;
                break;
            }
            ClaudeEvent::Result(ResultEvent::ErrorMaxTurns { .. }) => {
                error_message = Some("claude reported max turns".into());
                break;
            }
            ClaudeEvent::Result(ResultEvent::ErrorDuringExecution { message, .. }) => {
                let m = message.clone().unwrap_or_default();
                if m.to_lowercase().contains("session") {
                    session_invalid = true;
                }
                error_message = Some(m);
                break;
            }
            _ => {}
        }
    }

    let _ = turn.shutdown().await;

    // Persist binding (whatever session id Claude reported last).
    // Lift origin_channel + dispatcher out of goal.metadata so
    // the binding carries the chat that triggered the goal across
    // reattach, and the completion router knows where to send the
    // notify_origin summary.
    if let Some(sid) = &last_session_id {
        let workspace_pb: std::path::PathBuf = ctx.workspace.to_path_buf();
        let mut binding = SessionBinding::new(
            goal_id,
            sid.clone(),
            ctx.claude_cfg.default_args.model.clone(),
            Some(workspace_pb),
        );
        if let Some(o) = params.goal.metadata.get("origin_channel") {
            if !o.is_null() {
                if let Ok(parsed) =
                    serde_json::from_value::<nexo_driver_claude::OriginChannel>(o.clone())
                {
                    binding = binding.with_origin(parsed);
                }
            }
        }
        if let Some(d) = params.goal.metadata.get("dispatcher") {
            if !d.is_null() {
                if let Ok(parsed) =
                    serde_json::from_value::<nexo_driver_claude::DispatcherIdentity>(d.clone())
                {
                    binding = binding.with_dispatcher(parsed);
                }
            }
        }
        ctx.binding_store.upsert(binding).await?;
    }

    // Wall-time portion of usage.
    usage.wall_time = usage.wall_time.saturating_add(turn_start.elapsed());

    // Session-invalid: mark, return Continue so orchestrator retries.
    if session_invalid {
        ctx.binding_store.mark_invalid(goal_id).await?;
        return Ok(AttemptResult {
            goal_id,
            turn_index: params.turn_index,
            outcome: AttemptOutcome::Continue {
                reason: "session invalid: retrying".into(),
            },
            decisions_recorded: vec![],
            usage_after: usage,
            acceptance: None,
            final_text,
            harness_extras: harness_extras_with_session(&last_session_id),
        });
    }

    if let Some(msg) = error_message {
        return Ok(AttemptResult {
            goal_id,
            turn_index: params.turn_index,
            outcome: AttemptOutcome::Escalate { reason: msg },
            decisions_recorded: vec![],
            usage_after: usage,
            acceptance: None,
            final_text,
            harness_extras: harness_extras_with_session(&last_session_id),
        });
    }

    if !claimed_done {
        return Ok(AttemptResult {
            goal_id,
            turn_index: params.turn_index,
            outcome: AttemptOutcome::Continue {
                reason: "stream ended without result event".into(),
            },
            decisions_recorded: vec![],
            usage_after: usage,
            acceptance: None,
            final_text,
            harness_extras: harness_extras_with_session(&last_session_id),
        });
    }

    // Acceptance check post-claim.
    let verdict = ctx
        .acceptance
        .evaluate(&params.goal.acceptance, ctx.workspace)
        .await?;

    let outcome = if verdict.met {
        AttemptOutcome::Done
    } else {
        AttemptOutcome::NeedsRetry {
            failures: verdict.failures.clone(),
        }
    };

    Ok(AttemptResult {
        goal_id,
        turn_index: params.turn_index,
        outcome,
        decisions_recorded: vec![],
        usage_after: usage,
        acceptance: Some(verdict),
        final_text,
        harness_extras: harness_extras_with_session(&last_session_id),
    })
}

fn synthetic(
    goal_id: GoalId,
    turn_index: u32,
    outcome: AttemptOutcome,
    usage: BudgetUsage,
) -> AttemptResult {
    AttemptResult {
        goal_id,
        turn_index,
        outcome,
        decisions_recorded: vec![],
        usage_after: usage,
        acceptance: None,
        final_text: None,
        harness_extras: serde_json::Map::new(),
    }
}

fn map_sleep_result(value: &serde_json::Value) -> Option<AttemptOutcome> {
    if !value
        .get(SLEEP_SENTINEL_KEY)
        .and_then(|v| v.as_bool())
        .unwrap_or(false)
    {
        return None;
    }
    Some(AttemptOutcome::Sleep {
        duration_ms: value.get("duration_ms")?.as_u64()?,
        reason: value.get("reason")?.as_str()?.to_string(),
    })
}

fn harness_extras_with_session(sid: &Option<String>) -> serde_json::Map<String, serde_json::Value> {
    let mut m = serde_json::Map::new();
    if let Some(s) = sid {
        m.insert(
            "claude_code.session_id".into(),
            serde_json::Value::String(s.clone()),
        );
    }
    m
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn sleep_sentinel_maps_to_sleep_outcome() {
        let value = serde_json::json!({
            "__nexo_sleep__": true,
            "duration_ms": 270_000,
            "reason": "waiting"
        });
        let mapped = map_sleep_result(&value).unwrap();
        assert_eq!(
            mapped,
            AttemptOutcome::Sleep {
                duration_ms: 270_000,
                reason: "waiting".into(),
            }
        );
    }

    #[test]
    fn normal_tool_result_does_not_map_to_sleep() {
        let value = serde_json::json!({"text": "hello"});
        assert_eq!(map_sleep_result(&value), None);
    }
}