use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::Result;
use serde_json::json;
use smooth_operator_core::llm_provider::LlmProvider;
use smooth_operator_core::{
human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use smooth_operator::access_control::AccessContext;
use smooth_operator::adapter::{MessageQuery, StorageAdapter};
use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
use smooth_operator::rerank::Reranker;
use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
use smooth_operator::MAX_CITATIONS;
const AUTO_CONTEXT_LIMIT: usize = 3;
const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
"You are a helpful customer-support agent for the organization. \
Answer the user's question accurately and concisely. When a question depends on \
organization-specific facts (policies, products, documentation), call the \
`knowledge_search` tool to retrieve them before answering, and ground your answer \
in what you retrieve. If the knowledge base has no relevant information, say so. \
Remember facts the user tells you within the conversation and use them when asked.";
const MAX_PRIOR_MESSAGES: usize = 50;
const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);
pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;
pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;
pub struct ConfirmationConfig {
pub tool_patterns: Vec<String>,
pub session_id: String,
pub register: RegisterConfirmation,
pub clear: ClearConfirmation,
}
pub struct TurnResult {
pub reply: String,
pub message_id: String,
pub invoked_knowledge_search: bool,
pub citations: Vec<Citation>,
}
pub struct TurnRequest<'a> {
pub storage: Arc<dyn StorageAdapter>,
pub llm: LlmConfig,
pub max_iterations: u32,
pub conversation_id: &'a str,
pub request_id: &'a str,
pub user_message: &'a str,
pub access: AccessContext,
pub llm_provider: Option<Arc<dyn LlmProvider>>,
pub reranker: Option<Arc<dyn Reranker>>,
pub confirmation: Option<ConfirmationConfig>,
pub tool_provider: Option<Arc<dyn ToolProvider>>,
pub system_prompt: Option<String>,
pub org_id: Option<String>,
pub gateway_key: Option<String>,
}
pub async fn run_streaming_turn(
req: TurnRequest<'_>,
sink: &UnboundedSender<serde_json::Value>,
) -> Result<TurnResult> {
let TurnRequest {
storage,
llm,
max_iterations,
conversation_id,
request_id,
user_message,
access,
llm_provider,
reranker,
confirmation,
tool_provider,
system_prompt,
org_id,
gateway_key,
} = req;
let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
let auto_sources: Vec<KnowledgeResult> = knowledge
.query(user_message, AUTO_CONTEXT_LIMIT)
.unwrap_or_default();
let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
persist_message(
storage.as_ref(),
conversation_id,
Direction::Inbound,
user_message,
)
.await?;
let resolved_prompt = system_prompt
.as_deref()
.unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
let config = AgentConfig::new("smooth-agent-chat", resolved_prompt, llm)
.with_max_iterations(max_iterations)
.with_knowledge(Arc::clone(&knowledge))
.with_prior_messages(prior);
let mut tools = ToolRegistry::new();
let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
.with_result_sink(Arc::clone(&tool_sources));
if let Some(reranker) = reranker {
knowledge_search = knowledge_search.with_reranker(reranker);
}
tools.register(knowledge_search);
if let Some(provider) = tool_provider {
let mut ctx =
ToolProviderContext::new(org_id, access.clone()).with_conversation_id(conversation_id);
if let Some(key) = gateway_key {
ctx = ctx.with_gateway_key(key);
}
for tool in provider.tools_for(&ctx).await {
tools.register_arc(tool);
}
}
let confirmation_bridge = match &confirmation {
Some(cfg) if !cfg.tool_patterns.is_empty() => {
let pair = human_channel();
tools.add_hook(ConfirmationHook::new(
cfg.tool_patterns.clone(),
pair.request_tx,
pair.response_rx,
CONFIRMATION_TIMEOUT,
));
Some(spawn_confirmation_bridge(
pair.request_rx,
pair.response_tx,
sink.clone(),
request_id.to_string(),
cfg.session_id.clone(),
Arc::clone(&cfg.register),
))
}
_ => None,
};
let agent = {
let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
match llm_provider {
Some(provider) => agent.with_llm_provider(provider),
None => agent,
}
};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
let request_id_owned = request_id.to_string();
let sink_clone = sink.clone();
let translator = tokio::spawn(async move {
let mut invoked_knowledge_search = false;
while let Some(event) = rx.recv().await {
match event {
AgentEvent::TokenDelta { content } => {
if !content.is_empty() {
let _ = sink_clone
.send(crate::protocol::stream_token(&request_id_owned, &content));
}
}
AgentEvent::ToolCallStart {
tool_name,
arguments,
..
} => {
if tool_name == "knowledge_search" {
invoked_knowledge_search = true;
}
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&tool_name,
json!({
"rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
}),
));
}
AgentEvent::ToolCallComplete {
tool_name,
result,
is_error,
..
} => {
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&tool_name,
json!({
"rawResponse": json!({
"toolResult": { "name": tool_name, "isError": is_error, "result": result }
}),
}),
));
}
AgentEvent::PhaseStart { phase, .. } => {
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&phase,
json!({}),
));
}
_ => {}
}
}
invoked_knowledge_search
});
let conversation = agent.run_with_channel(user_message, tx).await?;
drop(agent);
if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
let _ = handle.await;
(cfg.clear)(&cfg.session_id);
}
let invoked_knowledge_search = translator.await.unwrap_or(false);
let reply = conversation
.last_assistant_content()
.unwrap_or_default()
.to_string();
let message_id = if reply.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
persist_message(
storage.as_ref(),
conversation_id,
Direction::Outbound,
&reply,
)
.await?
.id
};
let tool_sources = match Arc::try_unwrap(tool_sources) {
Ok(mutex) => mutex
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner),
Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
};
let citations = collect_citations(&auto_sources, &tool_sources);
Ok(TurnResult {
reply,
message_id,
invoked_knowledge_search,
citations,
})
}
fn spawn_confirmation_bridge(
mut request_rx: UnboundedReceiver<HumanRequest>,
response_tx: UnboundedSender<HumanResponse>,
sink: UnboundedSender<serde_json::Value>,
request_id: String,
session_id: String,
register: RegisterConfirmation,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(req) = request_rx.recv().await {
match req {
HumanRequest::Confirm {
tool_name, prompt, ..
} => {
register(&session_id, response_tx.clone());
let _ = sink.send(crate::protocol::write_confirmation_required(
&request_id,
&tool_name,
&prompt,
));
}
HumanRequest::Input { .. } => {
let _ = response_tx.send(HumanResponse::Denied {
reason: "free-form human input is not supported on this channel".into(),
});
}
}
}
})
}
fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
let mut seen = std::collections::HashSet::new();
auto.iter()
.chain(tool.iter())
.filter(|r| seen.insert(r.document_id.clone()))
.take(MAX_CITATIONS)
.map(Citation::from_knowledge_result)
.collect()
}
async fn load_prior_messages(
storage: &dyn StorageAdapter,
conversation_id: &str,
) -> Result<Vec<EngineMessage>> {
let page = storage
.list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
.await?;
let mut out = Vec::with_capacity(page.messages.len());
for m in page.messages {
let text = m
.content
.text
.clone()
.or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
.unwrap_or_default();
if text.is_empty() {
continue;
}
let role = match m.direction {
Direction::Inbound => Role::User,
Direction::Outbound => Role::Assistant,
};
out.push(EngineMessage {
id: m.id,
role,
content: text,
tool_call_id: None,
tool_name: None,
tool_calls: vec![],
reasoning_content: None,
timestamp: m.created_at,
});
}
Ok(out)
}
async fn persist_message(
storage: &dyn StorageAdapter,
conversation_id: &str,
direction: Direction,
text: &str,
) -> Result<DomainMessage> {
let now = chrono::Utc::now();
let message = DomainMessage {
id: uuid::Uuid::new_v4().to_string(),
external_id: None,
organization_id: None,
conversation_id: Some(conversation_id.to_string()),
direction,
content: MessageContent::from_text(text),
from: None,
to: None,
metadata_json: None,
analytics_json: None,
created_at: now,
updated_at: None,
};
storage.append_message(message).await
}
#[must_use]
pub fn general_agent_response(reply: &str) -> serde_json::Value {
json!({
"responseParts": [reply],
"customerHappinessScore": 0.5,
"needsSatisfactionScore": 0.5,
"requestSummary": "",
"resolutionStatus": "in_progress",
"suggestedNextActions": [],
})
}