roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
//! Scheduled (cron) task execution via the unified pipeline.
//!
//! Thin connector: subagent routing check → `run_pipeline()` → extract content.

use super::AppState;
use super::pipeline::{
    PipelineConfig, PipelineError, PipelineOutcome, PipelineRequest, run_pipeline,
};

pub(crate) async fn execute_scheduled_agent_task(
    state: &AppState,
    agent_id: &str,
    task: &str,
    job_name: Option<&str>,
) -> Result<String, String> {
    // For non-root agents, wrap the task so the planner delegates via the
    // pipeline's normal decomposition/delegation path. This ensures
    // injection defense, session resolution, turn persistence,
    // cache/cost/memory bookkeeping, and guard chain all run.
    let config = state.config.read().await;
    let root_agent_id = config.agent.id.clone();
    drop(config);

    let effective_content: std::borrow::Cow<'_, str> =
        if agent_id.eq_ignore_ascii_case(&root_agent_id) {
            task.into()
        } else {
            format!("[scheduled: delegate to subagent \"{agent_id}\"] {task}").into()
        };

    // ── Call: invoke the factory ────────────────────────────────────
    // Cron prefers local models: they're free, don't exhaust cloud rate limits,
    // and latency doesn't matter for background work. Fall back to the first
    // configured fallback if no local models are available.
    let mut cron_config = PipelineConfig::cron();
    if let Some(name) = job_name {
        cron_config.session_nickname_override = Some(format!("Scheduled: {name}"));
    }
    {
        let cfg = state.config.read().await;
        let local_model = cfg
            .models
            .fallbacks
            .iter()
            .find(|m| {
                let prefix = roboticus_core::model::provider_prefix(m);
                prefix == "ollama"
                    || prefix == "lmstudio"
                    || prefix == "docker-model-runner"
                    || prefix == "llamacpp"
            })
            .or_else(|| cfg.models.fallbacks.first())
            .cloned();
        if let Some(ref model) = local_model {
            cron_config.model_override = Some(model.clone());
            tracing::debug!(
                model = model.as_str(),
                "cron task preferring local/fallback model to preserve primary rate budget"
            );
        }
    }
    let request = PipelineRequest {
        state,
        config: cron_config,
        raw_content: &effective_content,
        session_id_hint: None,
        scope_hint: None,
        is_correction_turn: false,
        channel_context: None,
        content_parts: None,
    };

    let outcome = run_pipeline(request).await.map_err(|e| match e {
        PipelineError::InjectionBlocked { threat_score } => {
            format!("scheduled task blocked: injection detected (score={threat_score:.2})")
        }
        other => other.to_string(),
    })?;

    // ── Format: extract content from outcome ───────────────────────
    match outcome {
        PipelineOutcome::Complete { result, .. } => Ok(result.content),
        PipelineOutcome::SpecialistProposal { prompt, .. } => Err(format!(
            "scheduled task requires specialist creation before execution: {prompt}"
        )),
        PipelineOutcome::StreamReady(_) => {
            Err("unexpected streaming outcome on cron endpoint".into())
        }
    }
}