use std::sync::{Arc, Mutex};
use anyhow::Result;
use serde_json::json;
use smooth_operator_core::llm_provider::LlmProvider;
use smooth_operator_core::{
Agent, AgentConfig, AgentEvent, KnowledgeBase, KnowledgeResult, LlmConfig,
Message as EngineMessage, Role, ToolRegistry,
};
use tokio::sync::mpsc::UnboundedSender;
use smooth_operator::access_control::AccessContext;
use smooth_operator::adapter::{MessageQuery, StorageAdapter};
use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
use smooth_operator::rerank::Reranker;
use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
use smooth_operator::MAX_CITATIONS;
const AUTO_CONTEXT_LIMIT: usize = 3;
const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
"You are a helpful customer-support agent for the organization. \
Answer the user's question accurately and concisely. When a question depends on \
organization-specific facts (policies, products, documentation), call the \
`knowledge_search` tool to retrieve them before answering, and ground your answer \
in what you retrieve. If the knowledge base has no relevant information, say so. \
Remember facts the user tells you within the conversation and use them when asked.";
const MAX_PRIOR_MESSAGES: usize = 50;
pub struct TurnResult {
pub reply: String,
pub message_id: String,
pub invoked_knowledge_search: bool,
pub citations: Vec<Citation>,
}
pub struct TurnRequest<'a> {
pub storage: Arc<dyn StorageAdapter>,
pub llm: LlmConfig,
pub max_iterations: u32,
pub conversation_id: &'a str,
pub request_id: &'a str,
pub user_message: &'a str,
pub access: AccessContext,
pub llm_provider: Option<Arc<dyn LlmProvider>>,
pub reranker: Option<Arc<dyn Reranker>>,
}
pub async fn run_streaming_turn(
req: TurnRequest<'_>,
sink: &UnboundedSender<serde_json::Value>,
) -> Result<TurnResult> {
let TurnRequest {
storage,
llm,
max_iterations,
conversation_id,
request_id,
user_message,
access,
llm_provider,
reranker,
} = req;
let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
let auto_sources: Vec<KnowledgeResult> = knowledge
.query(user_message, AUTO_CONTEXT_LIMIT)
.unwrap_or_default();
let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
persist_message(
storage.as_ref(),
conversation_id,
Direction::Inbound,
user_message,
)
.await?;
let config = AgentConfig::new("smooth-agent-chat", KNOWLEDGE_CHAT_SYSTEM_PROMPT, llm)
.with_max_iterations(max_iterations)
.with_knowledge(Arc::clone(&knowledge))
.with_prior_messages(prior);
let mut tools = ToolRegistry::new();
let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
.with_result_sink(Arc::clone(&tool_sources));
if let Some(reranker) = reranker {
knowledge_search = knowledge_search.with_reranker(reranker);
}
tools.register(knowledge_search);
let agent = {
let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
match llm_provider {
Some(provider) => agent.with_llm_provider(provider),
None => agent,
}
};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
let request_id_owned = request_id.to_string();
let sink_clone = sink.clone();
let translator = tokio::spawn(async move {
let mut invoked_knowledge_search = false;
while let Some(event) = rx.recv().await {
match event {
AgentEvent::TokenDelta { content } => {
if !content.is_empty() {
let _ = sink_clone
.send(crate::protocol::stream_token(&request_id_owned, &content));
}
}
AgentEvent::ToolCallStart {
tool_name,
arguments,
..
} => {
if tool_name == "knowledge_search" {
invoked_knowledge_search = true;
}
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&tool_name,
json!({
"rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
}),
));
}
AgentEvent::ToolCallComplete {
tool_name,
result,
is_error,
..
} => {
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&tool_name,
json!({
"rawResponse": json!({
"toolResult": { "name": tool_name, "isError": is_error, "result": result }
}),
}),
));
}
AgentEvent::PhaseStart { phase, .. } => {
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&phase,
json!({}),
));
}
_ => {}
}
}
invoked_knowledge_search
});
let conversation = agent.run_with_channel(user_message, tx).await?;
let invoked_knowledge_search = translator.await.unwrap_or(false);
let reply = conversation
.last_assistant_content()
.unwrap_or_default()
.to_string();
let message_id = if reply.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
persist_message(
storage.as_ref(),
conversation_id,
Direction::Outbound,
&reply,
)
.await?
.id
};
let tool_sources = match Arc::try_unwrap(tool_sources) {
Ok(mutex) => mutex
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner),
Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
};
let citations = collect_citations(&auto_sources, &tool_sources);
Ok(TurnResult {
reply,
message_id,
invoked_knowledge_search,
citations,
})
}
fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
let mut seen = std::collections::HashSet::new();
auto.iter()
.chain(tool.iter())
.filter(|r| seen.insert(r.document_id.clone()))
.take(MAX_CITATIONS)
.map(Citation::from_knowledge_result)
.collect()
}
async fn load_prior_messages(
storage: &dyn StorageAdapter,
conversation_id: &str,
) -> Result<Vec<EngineMessage>> {
let page = storage
.list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
.await?;
let mut out = Vec::with_capacity(page.messages.len());
for m in page.messages {
let text = m
.content
.text
.clone()
.or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
.unwrap_or_default();
if text.is_empty() {
continue;
}
let role = match m.direction {
Direction::Inbound => Role::User,
Direction::Outbound => Role::Assistant,
};
out.push(EngineMessage {
id: m.id,
role,
content: text,
tool_call_id: None,
tool_name: None,
tool_calls: vec![],
reasoning_content: None,
timestamp: m.created_at,
});
}
Ok(out)
}
async fn persist_message(
storage: &dyn StorageAdapter,
conversation_id: &str,
direction: Direction,
text: &str,
) -> Result<DomainMessage> {
let now = chrono::Utc::now();
let message = DomainMessage {
id: uuid::Uuid::new_v4().to_string(),
external_id: None,
organization_id: None,
conversation_id: Some(conversation_id.to_string()),
direction,
content: MessageContent::from_text(text),
from: None,
to: None,
metadata_json: None,
analytics_json: None,
created_at: now,
updated_at: None,
};
storage.append_message(message).await
}
#[must_use]
pub fn general_agent_response(reply: &str) -> serde_json::Value {
json!({
"responseParts": [reply],
"customerHappinessScore": 0.5,
"needsSatisfactionScore": 0.5,
"requestSummary": "",
"resolutionStatus": "in_progress",
"suggestedNextActions": [],
})
}