use std::sync::Arc;
use super::super::AppState;
const OBSERVER_ROLE: &str = "observer";
pub(crate) fn post_turn_ingest(
state: &AppState,
session_id: &str,
user_content: &str,
assistant_content: &str,
tool_results: &[(String, String)],
system_prompt_hash: &str,
) {
let db = state.db.clone();
let config = Arc::clone(&state.config);
let session = session_id.to_string();
let user = user_content.to_string();
let assistant = assistant_content.to_string();
let tools = tool_results.to_vec();
let llm = Arc::clone(&state.llm);
let sys_hash = system_prompt_hash.to_string();
tokio::spawn(async move {
roboticus_agent::memory::ingest_turn(&db, &session, &user, &assistant, &tools);
let ctx_cfg = &config.read().await.context;
if ctx_cfg.checkpoint_enabled
&& let Ok(msgs) = roboticus_db::sessions::list_messages(&db, &session, None)
{
let turn_count = msgs.len() as u32;
if turn_count > 0 && turn_count.is_multiple_of(ctx_cfg.checkpoint_interval_turns) {
let mem_summary = msgs
.iter()
.filter(|m| m.role == "system")
.map(|m| m.content.as_str())
.collect::<Vec<_>>()
.join("\n---\n");
let digest = msgs.last().map(|m| m.content.as_str());
if let Err(e) = roboticus_db::checkpoint::save_checkpoint(
&db,
&session,
&sys_hash,
&mem_summary[..mem_summary.len().min(2000)],
None,
digest,
turn_count as i64,
) {
tracing::warn!(error = %e, session_id = %session, "failed to save context checkpoint");
} else {
tracing::debug!(session_id = %session, turn_count, "saved context checkpoint");
}
}
}
let llm = llm.read().await;
let chunk_config = roboticus_agent::retrieval::ChunkConfig::default();
let chunks = roboticus_agent::retrieval::chunk_text(&assistant, &chunk_config);
for chunk in &chunks {
if let Ok(embedding) = llm.embedding.embed_single(&chunk.text).await {
let embed_id = uuid::Uuid::new_v4().to_string();
roboticus_db::embeddings::store_embedding(
&db,
&embed_id,
"turn",
&session,
&chunk.text[..chunk.text.len().min(200)],
&embedding,
)
.inspect_err(
|e| tracing::warn!(error = %e, chunk_idx = chunk.index, "failed to store chunk embedding"),
)
.ok();
}
}
if config.read().await.agent.delegation_enabled
&& let Ok(agents) = roboticus_db::agents::list_sub_agents(&db)
{
let observers: Vec<_> = agents
.iter()
.filter(|a| a.enabled && a.role == OBSERVER_ROLE)
.collect();
for observer in &observers {
let turn_summary = format!(
"Turn observation for session {}:\n\nUser: {}\n\nAssistant: {}",
session,
user.chars().take(500).collect::<String>(),
assistant.chars().take(1000).collect::<String>(),
);
roboticus_agent::memory::ingest_turn(
&db,
&session,
&format!("[observer:{}] Turn content received", observer.name),
&turn_summary,
&[],
);
let _ = roboticus_db::agents::touch_sub_agent(&db, &observer.name);
tracing::debug!(
observer = %observer.name,
session_id = %session,
"dispatched turn content to observer subagent"
);
}
}
});
}
pub(crate) async fn refine_session_nickname(
db: &roboticus_db::Database,
llm: &std::sync::Arc<tokio::sync::RwLock<roboticus_llm::LlmService>>,
session_id: &str,
oauth: &roboticus_llm::oauth::OAuthManager,
keystore: &roboticus_core::keystore::Keystore,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let messages = roboticus_db::sessions::list_messages(db, session_id, Some(8))?;
if messages.len() < 4 {
return Ok(());
}
let mut conversation = String::with_capacity(1024);
for m in &messages {
let prefix = if m.role == "user" {
"User"
} else {
"Assistant"
};
let snippet: String = m.content.chars().take(200).collect();
conversation.push_str(&format!("{prefix}: {snippet}\n"));
}
let prompt = format!(
"Summarize this conversation topic in 3-6 words as a short title. \
Only output the title, nothing else.\n\n{conversation}"
);
let llm_read = llm.read().await;
let model_id = llm_read.router.select_model().to_string();
let model_for_api = roboticus_core::model::model_name(&model_id).to_string();
let provider = llm_read.providers.get_by_model(&model_id);
let (url, api_key, auth_header, format, extra_headers) = match provider {
Some(p) => {
let key = super::super::super::admin::resolve_provider_key(
&p.name,
p.is_local,
&p.auth_mode,
p.api_key_ref.as_deref(),
&p.api_key_env,
oauth,
keystore,
)
.await
.unwrap_or_else(|| {
if !p.is_local {
tracing::warn!(provider = %p.name, "API key resolved to None for non-local provider");
}
String::new()
});
(
format!("{}{}", p.url, p.chat_path),
key,
p.auth_header.clone(),
p.format,
p.extra_headers.clone(),
)
}
None => return Ok(()),
};
let llm_client = llm_read.client.clone();
drop(llm_read);
let req = roboticus_llm::format::UnifiedRequest {
model: model_for_api,
messages: vec![roboticus_llm::format::UnifiedMessage {
role: "user".into(),
content: prompt,
parts: None,
}],
max_tokens: Some(30),
temperature: Some(0.3),
system: None,
quality_target: None,
tools: vec![],
};
let body = roboticus_llm::format::translate_request(&req, format)?;
let resp = llm_client
.forward_with_provider(&url, &api_key, body, &auth_header, &extra_headers)
.await?;
let unified = roboticus_llm::format::translate_response(&resp, format)?;
let nickname = unified.content.trim().trim_matches('"').to_string();
if !nickname.is_empty() && nickname.len() <= 60 {
roboticus_db::sessions::update_nickname(db, session_id, &nickname)?;
tracing::info!(
session = %session_id,
nickname = %nickname,
"Refined session nickname via LLM"
);
}
Ok(())
}