zeph-core 0.19.0

Core agent loop, configuration, context builder, metrics, and vault for Zeph
Documentation
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

mod arise;
mod background;
mod d2skill;
mod erl;
mod outcomes;
mod preferences;
mod rl;
mod skill_commands;
mod trust;

#[cfg(test)]
mod tests;

use super::{Agent, Channel};

/// Accumulated skill outcome for a single tool result within a batch.
/// Used by `flush_skill_outcomes` to collapse N per-tool-result `record_skill_outcomes`
/// calls into a single pass after the tool batch completes.
pub(crate) struct PendingSkillOutcome {
    pub outcome: String,
    pub error_context: Option<String>,
    pub outcome_detail: Option<String>,
}

impl<C: Channel> Agent<C> {
    pub(crate) fn is_learning_enabled(&self) -> bool {
        self.learning_engine.is_enabled()
    }

    async fn is_skill_trusted_for_learning(&self, skill_name: &str) -> bool {
        let Some(memory) = &self.memory_state.persistence.memory else {
            return true;
        };
        let Ok(Some(row)) = memory.sqlite().load_skill_trust(skill_name).await else {
            return true; // no trust record = local skill = trusted
        };
        matches!(row.trust_level.as_str(), "trusted" | "verified")
    }

    pub(crate) async fn record_skill_outcomes(
        &mut self,
        outcome: &str,
        error_context: Option<&str>,
        outcome_detail: Option<&str>,
    ) {
        if self.skill_state.active_skill_names.is_empty() {
            return;
        }
        let Some(memory) = &self.memory_state.persistence.memory else {
            return;
        };
        let batch_result = tokio::time::timeout(
            std::time::Duration::from_secs(5),
            memory.sqlite().record_skill_outcomes_batch(
                &self.skill_state.active_skill_names,
                self.memory_state.persistence.conversation_id,
                outcome,
                error_context,
                outcome_detail,
            ),
        )
        .await;
        match batch_result {
            Ok(Ok(())) => {}
            Ok(Err(e)) => {
                tracing::warn!("failed to record skill outcomes: {e:#}");
            }
            Err(_) => {
                tracing::warn!("record_skill_outcomes: timed out after 5s");
                return;
            }
        }

        if outcome != "success" {
            for name in &self.skill_state.active_skill_names {
                self.check_rollback(name).await;
            }
        }

        let names: Vec<String> = self.skill_state.active_skill_names.clone();
        for name in &names {
            self.check_trust_transition(name).await;
        }
        self.update_skill_confidence_metrics().await;

        // SkillOrchestra RL routing head update (fire-and-forget).
        self.spawn_rl_head_update(outcome);

        // ARISE + STEM + ERL background tasks (fire-and-forget, never block response).
        self.spawn_stem_detection(outcome);
        if outcome == "success" {
            for name in &names {
                self.spawn_arise_trace_improvement(name);
                self.spawn_erl_reflection(name);
            }
        }
    }

    /// Flush all accumulated skill outcomes from a tool batch in a single pass.
    ///
    /// Replaces N sequential `record_skill_outcomes` calls (one per tool result) with one
    /// batched write + one rollback/trust check per skill. This eliminates the N×M×13
    /// sequential `SQLite` awaits that stalled the agent loop (#2770).
    ///
    /// # Dominant outcome for RL/ARISE/ERL signals
    ///
    /// Mixed batches (successes + failures) are collapsed to a single "dominant" outcome:
    /// any failure in the batch → `"tool_failure"`, otherwise `"success"`. This trades
    /// per-tool RL signal granularity for loop latency — acceptable because the RL head
    /// operates at turn granularity anyway.
    pub(crate) async fn flush_skill_outcomes(&mut self, outcomes: Vec<PendingSkillOutcome>) {
        if outcomes.is_empty() || self.skill_state.active_skill_names.is_empty() {
            return;
        }
        let Some(memory) = &self.memory_state.persistence.memory else {
            return;
        };

        // Batch-insert each outcome entry (one DB call per entry, but only once per tool —
        // not once per tool × skill as was the case before batching).
        for o in &outcomes {
            let batch_result = tokio::time::timeout(
                std::time::Duration::from_secs(5),
                memory.sqlite().record_skill_outcomes_batch(
                    &self.skill_state.active_skill_names,
                    self.memory_state.persistence.conversation_id,
                    &o.outcome,
                    o.error_context.as_deref(),
                    o.outcome_detail.as_deref(),
                ),
            )
            .await;
            match batch_result {
                Ok(Ok(())) => {}
                Ok(Err(e)) => tracing::warn!("failed to record skill outcomes: {e:#}"),
                Err(_) => {
                    tracing::warn!("record_skill_outcomes: timed out after 5s");
                    break;
                }
            }
        }

        let had_failure = outcomes.iter().any(|o| o.outcome != "success");

        // Run rollback + trust checks ONCE per skill (not once per tool result).
        if had_failure {
            for name in &self.skill_state.active_skill_names.clone() {
                self.check_rollback(name).await;
            }
        }
        let names: Vec<String> = self.skill_state.active_skill_names.clone();
        for name in &names {
            self.check_trust_transition(name).await;
        }

        // update_skill_confidence_metrics does one SQLite read + one watch::Sender::send_modify.
        // Wrap with a timeout to prevent stalling the loop if SQLite is slow.
        if let Err(_elapsed) = tokio::time::timeout(
            std::time::Duration::from_secs(2),
            self.update_skill_confidence_metrics(),
        )
        .await
        {
            tracing::warn!("update_skill_confidence_metrics timed out after 2s");
        }

        // Determine dominant outcome: any failure → "tool_failure", else "success".
        let dominant = if had_failure {
            "tool_failure"
        } else {
            "success"
        };

        self.spawn_rl_head_update(dominant);
        self.spawn_stem_detection(dominant);
        if !had_failure {
            for name in &names {
                self.spawn_arise_trace_improvement(name);
                self.spawn_erl_reflection(name);
            }
        }
    }

    /// Returns true and spawns `fut` when the learning task cap has not been reached.
    ///
    /// When at capacity, logs a debug message and returns false (no abort of existing tasks).
    pub(super) fn try_spawn_learning_task(
        &mut self,
        fut: impl std::future::Future<Output = ()> + Send + 'static,
    ) -> bool {
        if self.learning_engine.learning_tasks.len()
            >= crate::agent::learning_engine::MAX_LEARNING_TASKS
        {
            tracing::debug!(
                "learning_tasks at capacity ({}), skipping spawn",
                crate::agent::learning_engine::MAX_LEARNING_TASKS
            );
            return false;
        }
        self.learning_engine.learning_tasks.spawn(fut);
        true
    }

    pub(crate) async fn update_skill_confidence_metrics(&self) {
        let Some(memory) = &self.memory_state.persistence.memory else {
            return;
        };
        let Ok(stats) = memory.sqlite().load_skill_outcome_stats().await else {
            return;
        };
        let confidences: Vec<crate::metrics::SkillConfidence> = stats
            .iter()
            .map(|s| {
                let suc = u32::try_from(s.successes).unwrap_or(0);
                let fail = u32::try_from(s.failures).unwrap_or(0);
                crate::metrics::SkillConfidence {
                    name: s.skill_name.clone(),
                    posterior: zeph_skills::trust_score::posterior_mean(suc, fail),
                    total_uses: u32::try_from(s.total).unwrap_or(0),
                }
            })
            .collect();
        self.update_metrics(|m| m.skill_confidence = confidences);
    }
}