a3s-code-core 2.6.0

A3S Code Core - Embeddable AI agent library with tool execution
Documentation
//! Persisted session runtime contract.
//!
//! This module owns how an in-memory session becomes `SessionData` and how a
//! saved `SessionData` rehydrates runtime options and non-message artifacts.

use super::{AgentSession, SessionOptions};
use crate::agent::{AgentConfig, AgentResult};
use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
use crate::llm::Message;
use crate::store::{LlmConfigData, SessionData, SessionStore};
use crate::tools::ToolExecutor;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};

#[derive(Clone)]
pub(super) struct SessionPersistenceContext {
    session_store: Option<Arc<dyn SessionStore>>,
    session_id: String,
    workspace: PathBuf,
    config: AgentConfig,
    model_name: String,
    tool_executor: Arc<ToolExecutor>,
    trace_sink: crate::trace::InMemoryTraceSink,
    run_store: Arc<crate::run::InMemoryRunStore>,
    history: Arc<RwLock<Vec<Message>>>,
    verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
    auto_save: bool,
}

impl SessionPersistenceContext {
    pub(super) fn from_session(session: &AgentSession) -> Self {
        Self {
            session_store: session.session_store.clone(),
            session_id: session.session_id.clone(),
            workspace: session.workspace.clone(),
            config: session.config.clone(),
            model_name: session.model_name.clone(),
            tool_executor: Arc::clone(&session.tool_executor),
            trace_sink: session.trace_sink.clone(),
            run_store: Arc::clone(&session.run_store),
            history: Arc::clone(&session.history),
            verification_reports: Arc::clone(&session.verification_reports),
            auto_save: session.auto_save,
        }
    }

    pub(super) fn record_result(&self, result: &AgentResult) {
        *write_or_recover(&self.history) = result.messages.clone();
        if !result.verification_reports.is_empty() {
            write_or_recover(&self.verification_reports)
                .extend(result.verification_reports.clone());
        }
    }

    pub(super) async fn save(&self) -> Result<()> {
        let store = match &self.session_store {
            Some(store) => store,
            None => return Ok(()),
        };

        let history = read_or_recover(&self.history).clone();
        let verification_reports = read_or_recover(&self.verification_reports).clone();
        let data = build_session_data_snapshot(SessionDataSnapshotInput {
            session_id: &self.session_id,
            workspace: &self.workspace,
            config: &self.config,
            model_name: &self.model_name,
            history,
        })
        .await;

        store.save(&data).await?;
        store
            .save_artifacts(&self.session_id, &self.tool_executor.artifact_store())
            .await?;
        store
            .save_trace_events(&self.session_id, &self.trace_sink.events())
            .await?;
        store
            .save_run_records(&self.session_id, &self.run_store.records().await)
            .await?;
        store
            .save_verification_reports(&self.session_id, &verification_reports)
            .await?;
        tracing::debug!("Session {} saved", self.session_id);
        Ok(())
    }

    pub(super) async fn auto_save_if_enabled(&self) {
        if self.auto_save {
            if let Err(e) = self.save().await {
                tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
            }
        }
    }
}

pub(super) fn load_session_data(
    store: &Arc<dyn SessionStore>,
    session_id: &str,
) -> Result<SessionData> {
    let data = match tokio::runtime::Handle::try_current() {
        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
            .map_err(|e| {
                CodeError::Session(format!("Failed to load session {}: {}", session_id, e))
            })?,
        Err(_) => {
            return Err(CodeError::Session(
                "No async runtime available for session resume".to_string(),
            ))
        }
    };

    data.ok_or_else(|| CodeError::Session(format!("Session not found: {}", session_id)))
}

pub(super) fn apply_persisted_runtime_options(
    mut opts: SessionOptions,
    data: &SessionData,
) -> SessionOptions {
    opts.session_id = Some(data.id.clone());

    if opts.model.is_none() {
        opts.model = persisted_model_ref(data);
    }
    if opts.queue_config.is_none() {
        opts.queue_config = data.config.queue_config.clone();
    }
    if opts.confirmation_manager.is_none() && opts.confirmation_policy.is_none() {
        opts.confirmation_policy = data.config.confirmation_policy.clone();
    }
    if opts.permission_checker.is_none() && opts.permission_policy.is_none() {
        if let Some(policy) = data.config.permission_policy.clone() {
            opts = opts.with_permission_policy(policy);
        }
    }

    opts
}

pub(super) fn restore_persisted_session_state(
    session: &AgentSession,
    store: &Arc<dyn SessionStore>,
    data: SessionData,
) -> Result<()> {
    let session_id = data.id.clone();
    *write_or_recover(&session.history) = data.messages;

    if let Some(artifacts) = load_artifacts(store, &session_id)? {
        let target_store = session.tool_executor.artifact_store();
        for artifact in artifacts.artifacts() {
            target_store.put(artifact);
        }
    }

    if let Some(events) = load_trace_events(store, &session_id)? {
        session.trace_sink.replace_events(events);
    }

    if let Some(records) = load_run_records(store, &session_id)? {
        if let Ok(handle) = tokio::runtime::Handle::try_current() {
            tokio::task::block_in_place(|| {
                handle.block_on(session.run_store.replace_records(records))
            });
        }
    }

    if let Some(reports) = load_verification_reports(store, &session_id)? {
        *write_or_recover(&session.verification_reports) = reports;
    }

    Ok(())
}

struct SessionDataSnapshotInput<'a> {
    session_id: &'a str,
    workspace: &'a Path,
    config: &'a AgentConfig,
    model_name: &'a str,
    history: Vec<Message>,
}

async fn build_session_data_snapshot(input: SessionDataSnapshotInput<'_>) -> SessionData {
    let confirmation_policy = match &input.config.confirmation_manager {
        Some(manager) => Some(manager.policy().await),
        None => input.config.confirmation_policy.clone(),
    };
    let model_name = persisted_model_name(input.model_name);
    let now = chrono::Utc::now().timestamp();

    SessionData {
        id: input.session_id.to_string(),
        config: crate::store::SessionConfig {
            name: String::new(),
            workspace: input.workspace.display().to_string(),
            system_prompt: Some(input.config.prompt_slots.build()),
            max_context_length: input.config.max_context_tokens.min(u32::MAX as usize) as u32,
            auto_compact: input.config.auto_compact,
            auto_compact_threshold: input.config.auto_compact_threshold,
            storage_type: crate::config::StorageBackend::File,
            queue_config: input.config.queue_config.clone(),
            confirmation_policy,
            permission_policy: input.config.permission_policy.clone(),
            parent_id: None,
            security_config: None,
            hook_engine: None,
            planning_mode: input.config.planning_mode,
            goal_tracking: input.config.goal_tracking,
        },
        state: crate::store::SessionState::Active,
        messages: input.history,
        context_usage: crate::store::ContextUsage::default(),
        total_usage: crate::llm::TokenUsage::default(),
        total_cost: 0.0,
        model_name,
        cost_records: Vec::new(),
        tool_names: SessionData::tool_names_from_definitions(&input.config.tools),
        thinking_enabled: false,
        thinking_budget: None,
        created_at: now,
        updated_at: now,
        llm_config: model_config_data(input.model_name),
        tasks: Vec::new(),
        parent_id: None,
    }
}

fn persisted_model_ref(data: &SessionData) -> Option<String> {
    if let Some(llm_config) = &data.llm_config {
        return Some(format!("{}/{}", llm_config.provider, llm_config.model));
    }
    data.model_name
        .as_ref()
        .filter(|model_name| model_name.contains('/'))
        .cloned()
}

fn persisted_model_name(model_name: &str) -> Option<String> {
    if model_name.is_empty() || model_name == "unknown" {
        None
    } else {
        Some(model_name.to_string())
    }
}

fn model_config_data(model_name: &str) -> Option<LlmConfigData> {
    let (provider, model) = model_name.split_once('/')?;
    Some(LlmConfigData {
        provider: provider.to_string(),
        model: model.to_string(),
        api_key: None,
        base_url: None,
    })
}

fn load_artifacts(
    store: &Arc<dyn SessionStore>,
    session_id: &str,
) -> Result<Option<crate::tools::ArtifactStore>> {
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => {
            tokio::task::block_in_place(|| handle.block_on(store.load_artifacts(session_id)))
                .map_err(|e| {
                    CodeError::Session(format!(
                        "Failed to load artifacts for session {}: {}",
                        session_id, e
                    ))
                })
        }
        Err(_) => Ok(None),
    }
}

fn load_trace_events(
    store: &Arc<dyn SessionStore>,
    session_id: &str,
) -> Result<Option<Vec<crate::trace::TraceEvent>>> {
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => {
            tokio::task::block_in_place(|| handle.block_on(store.load_trace_events(session_id)))
                .map_err(|e| {
                    CodeError::Session(format!(
                        "Failed to load trace events for session {}: {}",
                        session_id, e
                    ))
                })
        }
        Err(_) => Ok(None),
    }
}

fn load_run_records(
    store: &Arc<dyn SessionStore>,
    session_id: &str,
) -> Result<Option<Vec<crate::run::RunRecord>>> {
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => {
            tokio::task::block_in_place(|| handle.block_on(store.load_run_records(session_id)))
                .map_err(|e| {
                    CodeError::Session(format!(
                        "Failed to load run records for session {}: {}",
                        session_id, e
                    ))
                })
        }
        Err(_) => Ok(None),
    }
}

fn load_verification_reports(
    store: &Arc<dyn SessionStore>,
    session_id: &str,
) -> Result<Option<Vec<crate::verification::VerificationReport>>> {
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => tokio::task::block_in_place(|| {
            handle.block_on(store.load_verification_reports(session_id))
        })
        .map_err(|e| {
            CodeError::Session(format!(
                "Failed to load verification reports for session {}: {}",
                session_id, e
            ))
        }),
        Err(_) => Ok(None),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::store::{ContextUsage, SessionConfig, SessionState};

    fn persisted_data(model_name: Option<&str>, llm: Option<(&str, &str)>) -> SessionData {
        SessionData {
            id: "session-1".to_string(),
            config: SessionConfig::default(),
            state: SessionState::Active,
            messages: Vec::new(),
            context_usage: ContextUsage::default(),
            total_usage: crate::llm::TokenUsage::default(),
            total_cost: 0.0,
            model_name: model_name.map(ToOwned::to_owned),
            cost_records: Vec::new(),
            tool_names: Vec::new(),
            thinking_enabled: false,
            thinking_budget: None,
            created_at: 0,
            updated_at: 0,
            llm_config: llm.map(|(provider, model)| LlmConfigData {
                provider: provider.to_string(),
                model: model.to_string(),
                api_key: None,
                base_url: None,
            }),
            tasks: Vec::new(),
            parent_id: None,
        }
    }

    #[test]
    fn persisted_runtime_options_prefer_llm_config() {
        let data = persisted_data(Some("anthropic/old"), Some(("openai", "gpt-4o")));
        let opts = apply_persisted_runtime_options(SessionOptions::new(), &data);
        assert_eq!(opts.session_id.as_deref(), Some("session-1"));
        assert_eq!(opts.model.as_deref(), Some("openai/gpt-4o"));
    }

    #[test]
    fn persisted_runtime_options_fall_back_to_model_name() {
        let data = persisted_data(Some("openai/gpt-4o"), None);
        let opts = apply_persisted_runtime_options(SessionOptions::new(), &data);
        assert_eq!(opts.model.as_deref(), Some("openai/gpt-4o"));
    }

    #[test]
    fn model_config_never_persists_secret_material() {
        let data = model_config_data("openai/gpt-4o").expect("model config");
        assert_eq!(data.provider, "openai");
        assert_eq!(data.model, "gpt-4o");
        assert!(data.api_key.is_none());
        assert!(data.base_url.is_none());
    }
}