use std::sync::{Arc, Mutex};
use anyhow::Result;
use smooth_operator_core::llm_provider::LlmProvider;
use smooth_operator_core::{
Agent, AgentConfig, AgentEvent, FnNode, LlmConfig, Message as EngineMessage, Role,
ToolRegistry, Workflow, WorkflowBuilder,
};
use smooth_operator_core::KnowledgeResult;
use crate::access_control::{AccessContext, AclKnowledgeStore};
use crate::adapter::{MessageQuery, StorageAdapter};
use crate::curation::{CuratedKnowledgeStore, RetrievalFilter};
use crate::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
use crate::telemetry::{
GEN_AI_CONVERSATION_ID, GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM, GEN_AI_TOOL_NAME,
GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, SPAN_CHAT, SPAN_TOOL, SYSTEM_NAME,
};
use crate::tools::{KnowledgeResultSink, KnowledgeSearchTool};
use tracing::Instrument;
#[derive(Debug, Clone, Default)]
pub struct TurnState {
pub user_message: String,
pub reply: Option<String>,
}
pub struct AgentRuntime {
agent: Agent,
workflow: Workflow<TurnState>,
}
impl AgentRuntime {
pub fn new(name: impl Into<String>, llm: LlmConfig, tools: ToolRegistry) -> Result<Self> {
let name = name.into();
let config = AgentConfig::new(&name, "You are a smooth-agent reference runtime.", llm)
.with_max_iterations(8);
let agent = Agent::new(config, tools);
let respond = FnNode::new("respond", |mut state: TurnState| {
Box::pin(async move {
state.reply = Some(format!("ack: {}", state.user_message));
Ok(state)
})
});
let workflow = WorkflowBuilder::new()
.add_node(respond)
.set_entry("respond")
.set_end("respond")
.build()?;
Ok(Self { agent, workflow })
}
pub fn with_storage(
name: impl Into<String>,
llm: LlmConfig,
tools: ToolRegistry,
storage: &dyn StorageAdapter,
) -> Result<Self> {
let name = name.into();
let config = AgentConfig::new(&name, "You are a smooth-agent reference runtime.", llm)
.with_max_iterations(8)
.with_knowledge(storage.knowledge());
let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
let respond = FnNode::new("respond", |mut state: TurnState| {
Box::pin(async move {
state.reply = Some(format!("ack: {}", state.user_message));
Ok(state)
})
});
let workflow = WorkflowBuilder::new()
.add_node(respond)
.set_entry("respond")
.set_end("respond")
.build()?;
Ok(Self { agent, workflow })
}
pub fn agent_id(&self) -> &str {
&self.agent.id
}
pub async fn run(&self, message: impl Into<String>) -> Result<String> {
let state = TurnState {
user_message: message.into(),
reply: None,
};
let out = self.workflow.run(state).await?;
Ok(out.reply.unwrap_or_default())
}
pub fn agent(&self) -> &Agent {
&self.agent
}
}
pub type SharedRuntime = Arc<AgentRuntime>;
const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
"You are a helpful customer-support agent for the organization. \
Answer the user's question accurately and concisely. When a question depends on \
organization-specific facts (policies, products, documentation), call the \
`knowledge_search` tool to retrieve them before answering, and ground your answer \
in what you retrieve. If the knowledge base has no relevant information, say so. \
Remember facts the user tells you within the conversation and use them when asked.";
const MAX_PRIOR_MESSAGES: usize = 50;
pub const MAX_CITATIONS: usize = 8;
const AUTO_CONTEXT_LIMIT: usize = 3;
#[derive(Debug, Clone)]
pub struct TurnOutcome {
pub reply: String,
pub events: Vec<AgentEvent>,
pub citations: Vec<Citation>,
}
fn usage_from_events(events: &[AgentEvent]) -> Option<(u64, u64)> {
events.iter().find_map(|e| match e {
AgentEvent::Completed {
prompt_tokens,
completion_tokens,
..
} if *prompt_tokens > 0 || *completion_tokens > 0 => {
Some((*prompt_tokens, *completion_tokens))
}
_ => None,
})
}
fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
let mut seen = std::collections::HashSet::new();
auto.iter()
.chain(tool.iter())
.filter(|r| seen.insert(r.document_id.clone()))
.take(MAX_CITATIONS)
.map(Citation::from_knowledge_result)
.collect()
}
impl TurnOutcome {
#[must_use]
pub fn invoked_tool(&self, tool_name: &str) -> bool {
self.events.iter().any(|e| {
matches!(
e,
AgentEvent::ToolCallStart { tool_name: name, .. } if name == tool_name
)
})
}
#[must_use]
pub fn tool_result(&self, tool_name: &str) -> Option<&str> {
self.events.iter().find_map(|e| match e {
AgentEvent::ToolCallComplete {
tool_name: name,
result,
..
} if name == tool_name => Some(result.as_str()),
_ => None,
})
}
}
pub struct KnowledgeChatRuntime {
storage: Arc<dyn StorageAdapter>,
llm: LlmConfig,
llm_provider: Option<Arc<dyn LlmProvider>>,
max_iterations: u32,
access: Option<RuntimeAccessControl>,
curation: Option<RuntimeCuration>,
}
#[derive(Clone)]
struct RuntimeAccessControl {
store: AclKnowledgeStore,
context: AccessContext,
}
#[derive(Clone)]
struct RuntimeCuration {
store: CuratedKnowledgeStore,
context: AccessContext,
filter: RetrievalFilter,
}
impl KnowledgeChatRuntime {
#[must_use]
pub fn new(storage: Arc<dyn StorageAdapter>, llm: LlmConfig) -> Self {
Self {
storage,
llm,
llm_provider: None,
max_iterations: 8,
access: None,
curation: None,
}
}
#[must_use]
pub fn with_curation(
mut self,
store: CuratedKnowledgeStore,
context: AccessContext,
filter: RetrievalFilter,
) -> Self {
self.curation = Some(RuntimeCuration {
store,
context,
filter,
});
self
}
#[must_use]
pub fn with_retrieval_filter(mut self, filter: RetrievalFilter) -> Self {
if let Some(curation) = &mut self.curation {
curation.filter = filter;
}
self
}
#[must_use]
pub fn with_access_control(mut self, store: AclKnowledgeStore, context: AccessContext) -> Self {
self.access = Some(RuntimeAccessControl { store, context });
self
}
fn read_knowledge(&self) -> Arc<dyn smooth_operator_core::KnowledgeBase> {
if let Some(cur) = &self.curation {
return cur.store.reader(cur.filter.clone(), cur.context.clone());
}
match &self.access {
Some(ac) => ac.store.reader(ac.context.clone()),
None => self.storage.knowledge(),
}
}
#[must_use]
pub fn with_llm_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
self.llm_provider = Some(provider);
self
}
#[must_use]
pub fn with_max_iterations(mut self, max: u32) -> Self {
self.max_iterations = max;
self
}
fn build_agent(
&self,
events: Arc<Mutex<Vec<AgentEvent>>>,
prior: Vec<EngineMessage>,
citation_sink: KnowledgeResultSink,
) -> Agent {
let knowledge = self.read_knowledge();
let config = AgentConfig::new(
"smooth-agent-chat",
KNOWLEDGE_CHAT_SYSTEM_PROMPT,
self.llm.clone(),
)
.with_max_iterations(self.max_iterations)
.with_knowledge(Arc::clone(&knowledge))
.with_prior_messages(prior);
let mut tools = ToolRegistry::new();
tools.register(KnowledgeSearchTool::new(knowledge).with_result_sink(citation_sink));
let agent = Agent::new(config, tools)
.with_checkpoint_store(self.storage.checkpoints())
.with_event_handler(move |event| {
events.lock().expect("event sink poisoned").push(event);
});
match &self.llm_provider {
Some(provider) => agent.with_llm_provider(Arc::clone(provider)),
None => agent,
}
}
pub async fn run_turn(&self, conversation_id: &str, user_message: &str) -> Result<TurnOutcome> {
let turn_span = tracing::info_span!(
SPAN_CHAT,
{ GEN_AI_SYSTEM } = SYSTEM_NAME,
{ GEN_AI_REQUEST_MODEL } = %self.llm.model,
{ GEN_AI_CONVERSATION_ID } = %conversation_id,
{ GEN_AI_USAGE_INPUT_TOKENS } = tracing::field::Empty,
{ GEN_AI_USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
);
let outcome = self
.run_turn_inner(conversation_id, user_message)
.instrument(turn_span.clone())
.await?;
if let Some((input, output)) = usage_from_events(&outcome.events) {
turn_span.record(GEN_AI_USAGE_INPUT_TOKENS, input);
turn_span.record(GEN_AI_USAGE_OUTPUT_TOKENS, output);
}
for event in &outcome.events {
if let AgentEvent::ToolCallComplete {
tool_name,
duration_ms,
is_error,
..
} = event
{
let _tool_span = tracing::info_span!(
parent: &turn_span,
SPAN_TOOL,
{ GEN_AI_TOOL_NAME } = %tool_name,
duration_ms = *duration_ms,
is_error = *is_error,
)
.entered();
}
}
Ok(outcome)
}
async fn run_turn_inner(
&self,
conversation_id: &str,
user_message: &str,
) -> Result<TurnOutcome> {
let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
let auto_sources: Vec<KnowledgeResult> = self
.read_knowledge()
.query(user_message, AUTO_CONTEXT_LIMIT)
.unwrap_or_default();
let prior = self.load_prior_messages(conversation_id).await?;
let agent = self.build_agent(Arc::clone(&events), prior, Arc::clone(&tool_sources));
self.persist_message(conversation_id, Direction::Inbound, user_message)
.await?;
let conversation = agent.run(user_message).await?;
let reply = conversation
.last_assistant_content()
.unwrap_or_default()
.to_string();
if !reply.is_empty() {
self.persist_message(conversation_id, Direction::Outbound, &reply)
.await?;
}
drop(agent);
let events = match Arc::try_unwrap(events) {
Ok(mutex) => mutex
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner),
Err(arc) => arc.lock().expect("event sink poisoned").clone(),
};
let tool_sources = match Arc::try_unwrap(tool_sources) {
Ok(mutex) => mutex
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner),
Err(arc) => arc.lock().expect("citation sink poisoned").clone(),
};
let citations = collect_citations(&auto_sources, &tool_sources);
Ok(TurnOutcome {
reply,
events,
citations,
})
}
async fn persist_message(
&self,
conversation_id: &str,
direction: Direction,
text: &str,
) -> Result<()> {
let now = chrono::Utc::now();
let message = DomainMessage {
id: uuid::Uuid::new_v4().to_string(),
external_id: None,
organization_id: None,
conversation_id: Some(conversation_id.to_string()),
direction,
content: MessageContent::from_text(text),
from: None,
to: None,
metadata_json: None,
analytics_json: None,
created_at: now,
updated_at: None,
};
self.storage.append_message(message).await?;
Ok(())
}
async fn load_prior_messages(&self, conversation_id: &str) -> Result<Vec<EngineMessage>> {
let page = self
.storage
.list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
.await?;
let mut out = Vec::with_capacity(page.messages.len());
for m in page.messages {
let text = m
.content
.text
.clone()
.or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
.unwrap_or_default();
if text.is_empty() {
continue;
}
let role = match m.direction {
Direction::Inbound => Role::User,
Direction::Outbound => Role::Assistant,
};
out.push(EngineMessage {
id: m.id,
role,
content: text,
tool_call_id: None,
tool_name: None,
tool_calls: vec![],
reasoning_content: None,
timestamp: m.created_at,
});
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_llm() -> LlmConfig {
LlmConfig::openrouter("test-key").with_model("openai/gpt-4o")
}
#[tokio::test]
async fn runtime_constructs_agent_and_runs_workflow() {
let rt =
AgentRuntime::new("ref-agent", test_llm(), ToolRegistry::new()).expect("build runtime");
assert!(!rt.agent_id().is_empty());
let reply = rt.run("hello world").await.expect("run");
assert_eq!(reply, "ack: hello world");
}
}