use adk_rust::agent::LlmAgentBuilder;
use adk_rust::futures::StreamExt;
use adk_rust::identity::{SessionId, UserId};
use adk_rust::runner::{Runner, RunnerConfig};
use adk_rust::session::{CreateRequest, Event, InMemorySessionService, SessionService};
use adk_rust::{Agent, Content, Part};
use std::collections::HashMap;
use std::sync::Arc;
use crate::agent::engine::{build_engine, CredentialResolver};
use super::memory_tools::{
tool_ingest_entity, tool_link_memory_to_entity, tool_list_memories, tool_memory_compare,
tool_memory_judge, tool_memory_search, tool_memory_session_summary, tool_merge_memories,
tool_recall_memory, tool_save_memory, tool_update_memory_importance, tool_update_memory_status,
};
use super::sessions::Turn;
use super::state::AgentState;
use super::tools::{
tool_describe_table, tool_explore_schema, tool_find_references_to, tool_graph_traverse,
tool_list_databases, tool_run_kql, tool_run_sql, tool_sample_rows, SharedToolCtx,
};
pub const APP_NAME: &str = "kyma-agent";
pub const AGENT_NAME: &str = "kyma-assistant";
pub const ANON_USER: &str = "anonymous";
pub const DEFAULT_MODEL: &str = "gemma4:latest";
pub const DEFAULT_OLLAMA_HOST: &str = "http://localhost:11434";
const SYSTEM_PROMPT: &str = r#"You are kyma's data assistant. Users ask questions in English; you answer them by using the tools.
KQL IS THE PRIMARY QUERY LANGUAGE — prefer `run_kql` over `run_sql`.
KQL uses pipe syntax. Examples:
- Counting: `requests | where status >= 500 | summarize n=count() by url | top 10 by n`
- Time-bucket: `requests | where ts > ago(1h) | summarize n=count() by bin(ts, 1m) | sort by ts asc`
- Text search: `requests | where url contains "/api/" | take 20`
- Distinct values: `requests | distinct service`
- Graph: `edges | graph-traverse source "a" from src to dst max-hops 3`
- Projection: `requests | project ts, url, status | take 100`
Use `run_sql` only for: vector similarity search (cosine_distance(col, make_array(..)) UDF is SQL-only today), recursive CTEs (`WITH RECURSIVE`), window functions, or joins across many tables. `run_sql` is the escape hatch, not the default.
Cross-entity questions — USE THE GRAPH TOOLS:
- Start with `explore_schema(database)` for a one-shot view of every table + columns + sample values. Much cheaper than many `describe_table` calls. USE THIS FIRST when the question touches multiple tables or you don't yet know how entities relate.
- `find_references_to(value)` — given a value like "user-42", returns every (database, table, column) where it appears. The "where else does this show up?" primitive.
- `graph_traverse(database, edges_table, source, from_column, to_column, max_hops)` — wraps KQL's graph-traverse for tables that store edges as rows. Use for connectivity: "what services depend on X?".
Efficient workflow:
1. Open with `explore_schema` (full database view) OR `list_databases` + `describe_table` on specific targets. Batch independent lookups in the SAME turn — emit multiple tool calls together and the engine dispatches them in parallel.
2. For relationship questions, prefer `find_references_to` / `graph_traverse` over hand-written joins.
3. Once you know the schema, write a KQL pipeline and call `run_kql`. For vector similarity, fall back to `run_sql` with `cosine_distance`.
4. If a column's shape is unclear, call `sample_rows` for a few example records.
5. Produce a concise final answer in plain English. Cite the KQL (or SQL) you ran.
MEMORY — you have a persistent memory across sessions:
- `memory_search(query)` — the PRIMARY recall tool. Graph-aware hybrid search (semantic + keyword) expanded over connected memories, catalog resources, and traces. Call this early when a question may depend on prior context, preferences, or how entities relate. Returns ranked memories + a `linked` list of connected resources + a ready-to-use context block. Follow `linked` node ids with `graph_traverse` for a deeper subgraph. (`recall_memory` is an alias.)
- `save_memory(content, memory_type, …)` — store durable facts/decisions/preferences/learnings the user shares. Link them to entities with `link_memory_to_entity` when they're about a specific repo/service/table.
- Don't save trivia, transient state, or things already in the data. Prefer recalling before answering over guessing.
Rules:
- Do NOT fabricate schema. Always verify via a tool before writing a query.
- Do NOT claim data you didn't fetch via a tool call.
- Prefer ONE multi-tool turn over several sequential single-tool turns when calls are independent.
- You have at most 12 tool calls per question.
"#;
async fn compose_system_prompt(state: &AgentState) -> String {
let enabled = match state.skills.get().await {
Ok(s) => s,
Err(_) => return SYSTEM_PROMPT.to_string(),
};
if enabled.is_empty() {
return SYSTEM_PROMPT.to_string();
}
let enabled_set: std::collections::HashSet<&str> =
enabled.iter().map(String::as_str).collect();
let discovered = crate::agent::skills::discover_all();
let mut active: Vec<_> = discovered
.into_iter()
.filter(|s| enabled_set.contains(s.name.as_str()))
.collect();
if active.is_empty() {
return SYSTEM_PROMPT.to_string();
}
active.sort_by(|a, b| a.name.cmp(&b.name));
let mut out = String::from(SYSTEM_PROMPT);
out.push_str("\n\n---\nADDITIONAL SKILLS — the user has enabled these. Use them when they apply.\n");
for s in active {
out.push_str(&format!("\n## Skill: {}\n", s.name));
if !s.description.is_empty() {
out.push_str(&format!("**When to use:** {}\n\n", s.description));
}
out.push_str(s.body.trim());
out.push_str("\n");
}
out
}
pub async fn build_agent(state: &AgentState) -> anyhow::Result<Arc<dyn Agent>> {
let cfg = state.engines.get().await?;
let resolver = CredentialResolver::new(state.credentials.clone(), state.tenant);
let key = resolver.resolve(&cfg).await?;
let llm = build_engine(&cfg, key)?;
let shared = SharedToolCtx {
catalog: state.catalog.clone(),
format: state.format.clone(),
pool: state.pool.clone(),
};
let agent = LlmAgentBuilder::new(AGENT_NAME)
.description(
"Kyma inline data assistant — answers English questions about the user's data.",
)
.instruction(compose_system_prompt(state).await)
.model(llm)
.tool(tool_list_databases(shared.clone()))
.tool(tool_explore_schema(shared.clone()))
.tool(tool_describe_table(shared.clone()))
.tool(tool_run_kql(shared.clone()))
.tool(tool_run_sql(shared.clone()))
.tool(tool_sample_rows(shared.clone()))
.tool(tool_find_references_to(shared.clone()))
.tool(tool_graph_traverse(shared.clone()))
.tool(tool_save_memory(shared.clone()))
.tool(tool_memory_search(shared.clone()))
.tool(tool_recall_memory(shared.clone()))
.tool(tool_list_memories(shared.clone()))
.tool(tool_link_memory_to_entity(shared.clone()))
.tool(tool_ingest_entity(shared.clone()))
.tool(tool_update_memory_status(shared.clone()))
.tool(tool_update_memory_importance(shared.clone()))
.tool(tool_merge_memories(shared.clone()))
.tool(tool_memory_compare(shared.clone()))
.tool(tool_memory_judge(shared.clone()))
.tool(tool_memory_session_summary(shared))
.build()
.map_err(|e| anyhow::anyhow!("agent build failed: {e:?}"))?;
Ok(Arc::new(agent))
}
pub async fn make_runner(
state: &AgentState,
session_id: &str,
history: &[Turn],
summary: Option<&str>,
) -> anyhow::Result<Runner> {
let agent = build_agent(state).await?;
let sessions: Arc<dyn SessionService> = Arc::new(InMemorySessionService::new());
sessions
.create(CreateRequest {
app_name: APP_NAME.to_string(),
user_id: ANON_USER.to_string(),
session_id: Some(session_id.to_string()),
state: HashMap::new(),
})
.await
.map_err(|e| anyhow::anyhow!("session create failed: {e:?}"))?;
seed_history(&sessions, session_id, history, summary).await;
let runner = Runner::new(RunnerConfig {
app_name: APP_NAME.to_string(),
agent,
session_service: sessions,
artifact_service: None,
memory_service: None,
plugin_manager: None,
run_config: None,
compaction_config: None,
context_cache_config: None,
cache_capable: None,
request_context: None,
cancellation_token: None,
})
.map_err(|e| anyhow::anyhow!("runner build failed: {e:?}"))?;
Ok(runner)
}
async fn seed_history(
sessions: &Arc<dyn SessionService>,
session_id: &str,
history: &[Turn],
summary: Option<&str>,
) {
if let Some(s) = summary {
let s = s.trim();
if !s.is_empty() {
let mut ev = Event::new("seed-summary");
ev.author = "user".to_string();
ev.set_content(
Content::new("user")
.with_text(format!("Summary of earlier conversation (for context):\n{s}")),
);
let _ = sessions.append_event(session_id, ev).await;
}
}
for (i, turn) in history.iter().enumerate() {
if turn.text.trim().is_empty() {
continue;
}
let is_assistant = turn.role == "assistant";
let mut ev = Event::new(format!("seed-{i}"));
ev.author = if is_assistant { AGENT_NAME.to_string() } else { "user".to_string() };
let role = if is_assistant { "model" } else { "user" };
ev.set_content(Content::new(role).with_text(turn.text.clone()));
let _ = sessions.append_event(session_id, ev).await;
}
}
pub async fn run_oneshot(
state: &AgentState,
agent_name: &str,
description: &str,
instruction: &str,
input: &str,
) -> anyhow::Result<String> {
let cfg = state.engines.get().await?;
let resolver = CredentialResolver::new(state.credentials.clone(), state.tenant);
let key = resolver.resolve(&cfg).await?;
let llm = build_engine(&cfg, key)?;
let agent = LlmAgentBuilder::new(agent_name)
.description(description)
.instruction(instruction)
.model(llm)
.build()
.map_err(|e| anyhow::anyhow!("oneshot agent build failed: {e:?}"))?;
let agent: Arc<dyn Agent> = Arc::new(agent);
let sessions: Arc<dyn SessionService> = Arc::new(InMemorySessionService::new());
let sid = uuid::Uuid::new_v4().to_string();
sessions
.create(CreateRequest {
app_name: APP_NAME.to_string(),
user_id: ANON_USER.to_string(),
session_id: Some(sid.clone()),
state: HashMap::new(),
})
.await
.map_err(|e| anyhow::anyhow!("oneshot session create failed: {e:?}"))?;
let runner = Runner::new(RunnerConfig {
app_name: APP_NAME.to_string(),
agent,
session_service: sessions,
artifact_service: None,
memory_service: None,
plugin_manager: None,
run_config: None,
compaction_config: None,
context_cache_config: None,
cache_capable: None,
request_context: None,
cancellation_token: None,
})
.map_err(|e| anyhow::anyhow!("oneshot runner build failed: {e:?}"))?;
let user_id = UserId::new(ANON_USER).map_err(|e| anyhow::anyhow!("user_id: {e}"))?;
let session_id = SessionId::new(&sid).map_err(|e| anyhow::anyhow!("session_id: {e}"))?;
let content = Content::new("user").with_text(input.to_string());
let mut stream = runner
.run(user_id, session_id, content)
.await
.map_err(|e| anyhow::anyhow!("oneshot run: {e:?}"))?;
let mut out = String::new();
while let Some(ev) = stream.next().await {
let ev = ev.map_err(|e| anyhow::anyhow!("oneshot event: {e:?}"))?;
if ev.llm_response.partial {
continue;
}
for c in ev.llm_response.content.iter() {
for part in c.parts.iter() {
if let Part::Text { text } = part {
out.push_str(text);
}
}
}
}
Ok(out.trim().to_string())
}
pub async fn summarize_conversation(state: &AgentState, transcript: &str) -> anyhow::Result<String> {
run_oneshot(
state,
"kyma-summarizer",
"Summarizes prior conversation for context compression.",
"You compress conversations. Given a transcript, return a concise summary (3-6 \
sentences) that preserves durable facts, decisions, and the user's intent. \
Return only the summary text — no preamble, no headings.",
transcript,
)
.await
}
pub async fn model_id(state: &AgentState) -> String {
match state.engines.get().await {
Ok(cfg) => format!("{}/{}", cfg.kind.as_str(), cfg.model),
Err(_) => format!("ollama/{}", DEFAULT_MODEL),
}
}