roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
//! Post-turn background work: memory ingestion, embedding generation,
//! context checkpointing, observer subagent dispatch, and session nickname refinement.

use std::sync::Arc;

use super::super::AppState;

/// Subagent role value that opts into post-turn observation.
/// Any subagent with this role receives turn content after each turn completes.
const OBSERVER_ROLE: &str = "observer";

/// Spawn background memory ingestion + embedding generation for a completed turn.
pub(crate) fn post_turn_ingest(
    state: &AppState,
    session_id: &str,
    user_content: &str,
    assistant_content: &str,
    tool_results: &[(String, String)],
    system_prompt_hash: &str,
) {
    let db = state.db.clone();
    let config = Arc::clone(&state.config);
    let session = session_id.to_string();
    let user = user_content.to_string();
    let assistant = assistant_content.to_string();
    let tools = tool_results.to_vec();
    let llm = Arc::clone(&state.llm);
    let sys_hash = system_prompt_hash.to_string();
    tokio::spawn(async move {
        roboticus_agent::memory::ingest_turn(&db, &session, &user, &assistant, &tools);

        // Periodic context checkpoint
        let ctx_cfg = &config.read().await.context;
        if ctx_cfg.checkpoint_enabled
            && let Ok(msgs) = roboticus_db::sessions::list_messages(&db, &session, None)
        {
            let turn_count = msgs.len() as u32;
            if turn_count > 0 && turn_count.is_multiple_of(ctx_cfg.checkpoint_interval_turns) {
                let mem_summary = msgs
                    .iter()
                    .filter(|m| m.role == "system")
                    .map(|m| m.content.as_str())
                    .collect::<Vec<_>>()
                    .join("\n---\n");
                let digest = msgs.last().map(|m| m.content.as_str());
                if let Err(e) = roboticus_db::checkpoint::save_checkpoint(
                    &db,
                    &session,
                    &sys_hash,
                    &mem_summary[..mem_summary.len().min(2000)],
                    None,
                    digest,
                    turn_count as i64,
                ) {
                    tracing::warn!(error = %e, session_id = %session, "failed to save context checkpoint");
                } else {
                    tracing::debug!(session_id = %session, turn_count, "saved context checkpoint");
                }
            }
        }

        let llm = llm.read().await;
        let chunk_config = roboticus_agent::retrieval::ChunkConfig::default();
        let chunks = roboticus_agent::retrieval::chunk_text(&assistant, &chunk_config);

        for chunk in &chunks {
            if let Ok(embedding) = llm.embedding.embed_single(&chunk.text).await {
                let embed_id = uuid::Uuid::new_v4().to_string();
                roboticus_db::embeddings::store_embedding(
                    &db,
                    &embed_id,
                    "turn",
                    &session,
                    &chunk.text[..chunk.text.len().min(200)],
                    &embedding,
                )
                .inspect_err(
                    |e| tracing::warn!(error = %e, chunk_idx = chunk.index, "failed to store chunk embedding"),
                )
                .ok();
            }
        }

        // ── Observer subagent dispatch ──────────────────────────────────
        // Any subagent with role "observer" receives a summary of the turn
        // content for background processing (journaling, analysis, etc.).
        // The observer decides what to do with the content based on its skills.
        if config.read().await.agent.delegation_enabled
            && let Ok(agents) = roboticus_db::agents::list_sub_agents(&db)
        {
            let observers: Vec<_> = agents
                .iter()
                .filter(|a| a.enabled && a.role == OBSERVER_ROLE)
                .collect();

            for observer in &observers {
                let turn_summary = format!(
                    "Turn observation for session {}:\n\nUser: {}\n\nAssistant: {}",
                    session,
                    user.chars().take(500).collect::<String>(),
                    assistant.chars().take(1000).collect::<String>(),
                );

                // Store the observation as an episodic memory attributed to the observer.
                // The observer role tag allows retrieval filtering by observer identity.
                roboticus_agent::memory::ingest_turn(
                    &db,
                    &session,
                    &format!("[observer:{}] Turn content received", observer.name),
                    &turn_summary,
                    &[],
                );

                // Update last_used_at so workspace shows the observer as recently active
                let _ = roboticus_db::agents::touch_sub_agent(&db, &observer.name);

                tracing::debug!(
                    observer = %observer.name,
                    session_id = %session,
                    "dispatched turn content to observer subagent"
                );
            }
        }
    });
}

/// Refine a session's nickname using the LLM to summarize conversation topics.
pub(crate) async fn refine_session_nickname(
    db: &roboticus_db::Database,
    llm: &std::sync::Arc<tokio::sync::RwLock<roboticus_llm::LlmService>>,
    session_id: &str,
    oauth: &roboticus_llm::oauth::OAuthManager,
    keystore: &roboticus_core::keystore::Keystore,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let messages = roboticus_db::sessions::list_messages(db, session_id, Some(8))?;
    if messages.len() < 4 {
        return Ok(());
    }

    let mut conversation = String::with_capacity(1024);
    for m in &messages {
        let prefix = if m.role == "user" {
            "User"
        } else {
            "Assistant"
        };
        let snippet: String = m.content.chars().take(200).collect();
        conversation.push_str(&format!("{prefix}: {snippet}\n"));
    }

    let prompt = format!(
        "Summarize this conversation topic in 3-6 words as a short title. \
         Only output the title, nothing else.\n\n{conversation}"
    );

    // Extract everything we need from the LLM lock, then drop it before
    // the external API call to avoid holding the RwLock during I/O.
    let llm_read = llm.read().await;
    let model_id = llm_read.router.select_model().to_string();
    let model_for_api = roboticus_core::model::model_name(&model_id).to_string();

    let provider = llm_read.providers.get_by_model(&model_id);
    let (url, api_key, auth_header, format, extra_headers) = match provider {
        Some(p) => {
            let key = super::super::super::admin::resolve_provider_key(
                &p.name,
                p.is_local,
                &p.auth_mode,
                p.api_key_ref.as_deref(),
                &p.api_key_env,
                oauth,
                keystore,
            )
            .await
            .unwrap_or_else(|| {
                if !p.is_local {
                    tracing::warn!(provider = %p.name, "API key resolved to None for non-local provider");
                }
                String::new()
            });
            (
                format!("{}{}", p.url, p.chat_path),
                key,
                p.auth_header.clone(),
                p.format,
                p.extra_headers.clone(),
            )
        }
        None => return Ok(()),
    };
    let llm_client = llm_read.client.clone();
    drop(llm_read);

    let req = roboticus_llm::format::UnifiedRequest {
        model: model_for_api,
        messages: vec![roboticus_llm::format::UnifiedMessage {
            role: "user".into(),
            content: prompt,
            parts: None,
        }],
        max_tokens: Some(30),
        temperature: Some(0.3),
        system: None,
        quality_target: None,
        tools: vec![],
    };

    let body = roboticus_llm::format::translate_request(&req, format)?;
    let resp = llm_client
        .forward_with_provider(&url, &api_key, body, &auth_header, &extra_headers)
        .await?;

    let unified = roboticus_llm::format::translate_response(&resp, format)?;
    let nickname = unified.content.trim().trim_matches('"').to_string();

    if !nickname.is_empty() && nickname.len() <= 60 {
        roboticus_db::sessions::update_nickname(db, session_id, &nickname)?;
        tracing::info!(
            session = %session_id,
            nickname = %nickname,
            "Refined session nickname via LLM"
        );
    }
    Ok(())
}