roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
use super::super::guard_registry::{GuardChain, guard_sets};

// ── Guard set presets ─────────────────────────────────────────────────────

/// Which guard set to apply to inference output.
///
/// Resolved to a concrete `GuardChain` via `GuardSetPreset::resolve()` at
/// pipeline execution time. Using an enum rather than a direct `GuardChain`
/// allows `PipelineConfig` to be `Clone + Debug` without requiring `Guard`
/// implementors to be cloneable.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(in super::super) enum GuardSetPreset {
    /// All 10 guards — used for fresh inference on standard paths.
    Full,
    /// All 10 guards — used for cached responses. Fixes previously-missing
    /// `SubagentClaim` and `LiteraryQuoteRetry`.
    Cached,
    /// 6 guards — reduced subset for streaming where retries are impractical.
    Streaming,
    /// No guards applied. Used for paths where guards are inapplicable
    /// (e.g., cache guard set on the streaming path which doesn't use cache).
    None,
}

impl GuardSetPreset {
    /// Materialize the preset into a concrete guard chain.
    pub(in super::super) fn resolve(self) -> GuardChain {
        match self {
            Self::Full => guard_sets::full(),
            Self::Cached => guard_sets::cached(),
            Self::Streaming => guard_sets::streaming(),
            Self::None => GuardChain::empty(),
        }
    }
}

// ── Session resolution modes ──────────────────────────────────────────────

/// How the session is resolved for this pipeline execution.
///
/// Each entry point has different session semantics:
/// - API: optional `session_id` in request body, or create from web scope
/// - Channel: scope derived from `platform:chat_id`
/// - Cron: dedicated agent-scoped session
#[derive(Debug, Clone, PartialEq, Eq)]
pub(in super::super) enum SessionResolutionMode {
    /// API/Streaming: session_id provided in request body, or create from
    /// web scope if not provided.
    FromBody,
    /// Channel: scope derived from platform + chat_id.
    FromChannel {
        /// The chat platform identifier (e.g., "telegram", "discord").
        platform: String,
    },
    /// Cron/scheduled: find_or_create with agent-scoped session.
    Dedicated,
    /// Pre-resolved: session already created by caller (useful for
    /// testing or specialized workflows).
    #[cfg(test)]
    Provided { session_id: String },
}

// ── Authority modes ───────────────────────────────────────────────────────

/// How authority (RBAC) is determined for this pipeline execution.
///
/// Authority controls which tools the agent is permitted to execute.
/// Different entry points have different trust models.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(in super::super) enum AuthorityMode {
    /// API: resolve via `resolve_api_claim()`. Supports reduced authority
    /// from injection caution-level detection.
    ApiClaim,
    /// Channel: resolve via `resolve_channel_claim()` with sender context
    /// (allow-list membership, trusted sender IDs, threat score).
    ChannelClaim,
    /// Cron: hardcoded `InputAuthority::SelfGenerated`. Internal system
    /// caller with no external user input.
    SelfGenerated,
}

// ── Inference modes ───────────────────────────────────────────────────────

/// How inference is executed.
///
/// The two modes have fundamentally different execution models:
/// - Standard: full ReAct tool loop, shortcut dispatch, guard chain
/// - Streaming: direct provider SSE stream, no ReAct, minimal post-processing
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(in super::super) enum InferenceMode {
    /// Full inference with ReAct tool loop, shortcut dispatch, and guard chain.
    Standard,
    /// SSE streaming — direct provider call with chunk-by-chunk delivery.
    /// No ReAct loop. Pre-inference stages (decomposition, delegation,
    /// shortcut dispatch, cache) run synchronously before streaming begins.
    /// Reduced guard set applied during buffer and post-accumulation phases.
    Streaming,
}

// ── Pipeline configuration ────────────────────────────────────────────────

/// Declarative configuration for the unified pipeline.
///
/// Each entry point constructs a `PipelineConfig` via one of the four preset
/// constructors ([`api`], [`streaming`], [`channel`], [`cron`]), making it
/// impossible to forget a pipeline stage.
///
/// Every boolean flag corresponds to a pipeline stage. If the flag is `false`,
/// the stage is completely skipped — no branching inside the stage itself.
///
/// [`api`]: PipelineConfig::api
/// [`streaming`]: PipelineConfig::streaming
/// [`channel`]: PipelineConfig::channel
/// [`cron`]: PipelineConfig::cron
#[derive(Debug, Clone)]
#[allow(dead_code)] // fields set by constructors; some consumed only by tests until pipeline phases are wired
pub(in super::super) struct PipelineConfig {
    // ── Input defense ─────────────────────────────────────────────
    /// Run injection detection: block (>0.7), sanitize (0.3-0.7), pass (<0.3).
    pub injection_defense: bool,
    /// Track in-flight duplicates and reject concurrent identical requests.
    pub dedup_tracking: bool,

    // ── Session ──────────────────────────────────────────────────
    /// How to resolve or create the session for this turn.
    pub session_resolution: SessionResolutionMode,

    // ── Pre-inference ────────────────────────────────────────────
    /// Evaluate the decomposition gate for potential delegation.
    pub decomposition_gate: bool,
    /// Execute delegated subtasks via `orchestrate-subagents` before inference.
    pub delegated_execution: bool,
    /// Handle specialist creation control flows (channel-only feature).
    pub specialist_controls: bool,
    /// Try execution shortcuts (acknowledgement, model identity, etc.) before LLM.
    pub shortcuts_enabled: bool,
    /// Try skill-first fulfillment: match user input against registered skill
    /// triggers and execute directly before falling through to LLM inference.
    /// Gated on `authority == Creator` at runtime.
    pub skill_first_enabled: bool,
    /// Expand short user reactions (sarcasm, contradiction, quote-back) by
    /// prepending context from the previous assistant reply.  Also sets
    /// `is_correction_turn` when the expansion detects a correction.
    pub short_followup_expansion: bool,

    // ── Inference ────────────────────────────────────────────────
    /// Standard (ReAct loop) or Streaming (SSE, no ReAct).
    pub inference_mode: InferenceMode,
    /// Guard set applied to fresh inference output.
    pub guard_set: GuardSetPreset,
    /// Guard set applied to cached responses.
    pub cache_guard_set: GuardSetPreset,
    /// Whether semantic cache is checked before inference.
    pub cache_enabled: bool,

    // ── Authority ────────────────────────────────────────────────
    /// How authority (RBAC) is resolved for tool execution.
    pub authority_mode: AuthorityMode,

    // ── Post-inference ──────────────────────────────────────────
    /// Run background memory ingestion after the turn completes.
    pub post_turn_ingest: bool,
    /// Run background nickname refinement after 4+ messages.
    pub nickname_refinement: bool,

    // ── Output control ──────────────────────────────────────────
    /// Inject diagnostics metadata into system prompt.
    pub inject_diagnostics: bool,

    // ── Task operating state ─────────────────────────────────────
    /// Synthesize `TaskOperatingState` via `roboticus_agent::task_state::synthesize()`
    /// and route next action through `roboticus_agent::action_planner::plan()`.
    /// When `true`, the planner is the single authority on task routing.
    pub task_operating_state: bool,

    // ── Channel label ────────────────────────────────────────────
    /// Human-readable label for logging, cost tracking, and event bus.
    pub channel_label: String,

    /// Optional nickname override for the session. When set, used instead of
    /// derive_nickname() for dedicated sessions (cron, subagent).
    pub session_nickname_override: Option<String>,

    // ── Model routing ───────────────────────────────────────────
    /// Override the primary model for this pipeline execution.
    /// When `Some`, the pipeline uses this model instead of `config.models.primary`.
    /// Used by cron to avoid exhausting the primary model's rate limit.
    pub model_override: Option<String>,

    /// Use background inference budget (generous timeouts) instead of
    /// interactive budget. Background tasks don't have a user waiting,
    /// so they can afford longer provider timeouts and total budgets.
    pub background_budget: bool,
}

// ── Preset constructors ───────────────────────────────────────────────────

impl PipelineConfig {
    /// API endpoint (`/agent/message`): all features enabled.
    ///
    /// Injection defense, dedup, decomposition, delegated execution,
    /// shortcuts, full guard chain, cache, ReAct, diagnostics, nickname
    /// refinement. Authority via `resolve_api_claim()`.
    pub fn api() -> Self {
        Self {
            injection_defense: true,
            dedup_tracking: true,
            session_resolution: SessionResolutionMode::FromBody,
            decomposition_gate: true,
            delegated_execution: true,
            specialist_controls: false,
            shortcuts_enabled: true,
            skill_first_enabled: false,
            short_followup_expansion: true,
            inference_mode: InferenceMode::Standard,
            guard_set: GuardSetPreset::Full,
            cache_guard_set: GuardSetPreset::Cached,
            cache_enabled: true,
            authority_mode: AuthorityMode::ApiClaim,
            post_turn_ingest: true,
            nickname_refinement: true,
            inject_diagnostics: true,
            task_operating_state: true,
            channel_label: "api".into(),
            session_nickname_override: None,
            model_override: None,
            background_budget: false,
        }
    }

    /// SSE streaming endpoint (`/agent/message/stream`).
    ///
    /// Nearly identical to the API endpoint with two intentional exceptions:
    /// - `cache_guard_set` is `None` (guards cannot block mid-stream)
    /// - `nickname_refinement` is disabled (incompatible with SSE delivery)
    ///
    /// All other features — injection defense, dedup, decomposition, delegated
    /// execution, shortcuts — are fully enabled.
    ///
    /// Authority is resolved via `ApiClaim` (same as API endpoint). Pre-
    /// inference delegation and shortcuts run synchronously before streaming
    /// begins. The streaming connector handles post-stream bookkeeping.
    pub fn streaming() -> Self {
        Self {
            injection_defense: true,
            dedup_tracking: true,
            session_resolution: SessionResolutionMode::FromBody,
            decomposition_gate: true,
            delegated_execution: true,
            specialist_controls: false,
            shortcuts_enabled: true,
            skill_first_enabled: false,
            short_followup_expansion: true,
            inference_mode: InferenceMode::Streaming,
            guard_set: GuardSetPreset::Streaming,
            cache_guard_set: GuardSetPreset::None,
            cache_enabled: true,
            authority_mode: AuthorityMode::ApiClaim,
            post_turn_ingest: true,
            nickname_refinement: false,
            inject_diagnostics: true,
            task_operating_state: true,
            channel_label: "api-stream".into(),
            session_nickname_override: None,
            model_override: None,
            background_budget: false,
        }
    }

    /// Channel message (Telegram, Discord, Signal, Email, etc.).
    ///
    /// Full pipeline with channel-specific authority resolution.
    /// All core features enabled including skill-first fulfillment.
    /// Channel-specific behaviors NOT part of the pipeline (handled by
    /// channel connector):
    /// - Addressability filter
    /// - Multimodal enrichment
    /// - Typing/thinking indicators
    /// - Bot command handling
    /// - Reply formatting (telegram normalize, etc.)
    pub fn channel(platform: &str) -> Self {
        Self {
            injection_defense: true,
            dedup_tracking: true,
            session_resolution: SessionResolutionMode::FromChannel {
                platform: platform.to_string(),
            },
            decomposition_gate: true,
            delegated_execution: true,
            specialist_controls: true,
            shortcuts_enabled: true,
            skill_first_enabled: true,
            short_followup_expansion: true,
            inference_mode: InferenceMode::Standard,
            guard_set: GuardSetPreset::Full,
            cache_guard_set: GuardSetPreset::Cached,
            cache_enabled: true,
            authority_mode: AuthorityMode::ChannelClaim,
            post_turn_ingest: true,
            nickname_refinement: false,
            inject_diagnostics: false,
            task_operating_state: true,
            channel_label: platform.to_string(),
            session_nickname_override: None,
            model_override: None,
            background_budget: false,
        }
    }

    /// Cron/scheduled task execution: internal system caller.
    ///
    /// **Security fix**: injection defense is now enabled — was completely
    /// absent from `scheduled_tasks.rs`.
    ///
    /// No dedup (cron tasks are guaranteed unique by scheduler). Authority
    /// is `SelfGenerated` (internal system caller). No delegated execution
    /// (cron tasks go through standard inference with tool access). No
    /// nickname refinement.
    pub fn cron() -> Self {
        Self {
            injection_defense: true, // SECURITY FIX: was missing!
            dedup_tracking: false,
            session_resolution: SessionResolutionMode::Dedicated,
            decomposition_gate: true,
            delegated_execution: false, // cron doesn't pre-execute delegation
            specialist_controls: false,
            shortcuts_enabled: false, // Cron tasks are machine-generated; ack shortcuts don't apply
            skill_first_enabled: false,
            short_followup_expansion: false,
            inference_mode: InferenceMode::Standard,
            guard_set: GuardSetPreset::Full,
            cache_guard_set: GuardSetPreset::Cached,
            cache_enabled: true,
            authority_mode: AuthorityMode::SelfGenerated,
            post_turn_ingest: true,
            nickname_refinement: false,
            inject_diagnostics: false,
            task_operating_state: true,
            channel_label: "cron".into(),
            session_nickname_override: None,
            model_override: None,
            background_budget: true, // cron gets generous timeouts
        }
    }
}

// ── Stage predicates ──────────────────────────────────────────────────────
//
// Convenience methods for querying pipeline capabilities. These are used by
// `execute_unified_pipeline()` (Phase 5) to branch on stage availability.

#[cfg(test)]
impl PipelineConfig {
    /// Whether this pipeline uses the standard ReAct inference path.
    pub fn is_standard_inference(&self) -> bool {
        self.inference_mode == InferenceMode::Standard
    }

    /// Whether this pipeline uses the streaming inference path.
    pub fn is_streaming_inference(&self) -> bool {
        self.inference_mode == InferenceMode::Streaming
    }

    /// Whether authority needs to be enforced for tool execution.
    /// Returns `false` for `SelfGenerated` (cron).
    pub fn enforces_authority(&self) -> bool {
        matches!(
            self.authority_mode,
            AuthorityMode::ApiClaim | AuthorityMode::ChannelClaim
        )
    }

    /// Whether this pipeline can execute tools (ReAct loop).
    /// Streaming never executes tools.
    pub fn can_execute_tools(&self) -> bool {
        self.inference_mode == InferenceMode::Standard
    }

    /// Whether this pipeline resolves the session from the request body.
    /// True for API and streaming paths.
    pub fn resolves_session_from_body(&self) -> bool {
        matches!(self.session_resolution, SessionResolutionMode::FromBody)
    }

    /// Whether this is a channel pipeline.
    pub fn is_channel(&self) -> bool {
        matches!(
            self.session_resolution,
            SessionResolutionMode::FromChannel { .. }
        )
    }

    /// Whether this is a cron pipeline.
    pub fn is_cron(&self) -> bool {
        matches!(self.session_resolution, SessionResolutionMode::Dedicated)
            && matches!(self.authority_mode, AuthorityMode::SelfGenerated)
    }
}