use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::Result;
use serde_json::json;
use smooth_operator_core::llm_provider::LlmProvider;
use smooth_operator_core::{
human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use smooth_operator::access_control::AccessContext;
use smooth_operator::adapter::{MessageQuery, StorageAdapter};
use smooth_operator::agent_config::{
advance_after_verdict, judge_user_prompt, render_workflow_prompt_section, resolve_current_step,
AuthGateHook, ConversationWorkflow, WorkflowJudgeVerdict, JUDGE_SYSTEM_PROMPT,
};
use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
use smooth_operator::rerank::Reranker;
use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
use smooth_operator::MAX_CITATIONS;
const AUTO_CONTEXT_LIMIT: usize = 3;
const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
"You are a helpful customer-support agent for the organization. \
Answer the user's question accurately and concisely. When a question depends on \
organization-specific facts (policies, products, documentation), call the \
`knowledge_search` tool to retrieve them before answering, and ground your answer \
in what you retrieve. If the knowledge base has no relevant information, say so. \
Remember facts the user tells you within the conversation and use them when asked.";
const MAX_PRIOR_MESSAGES: usize = 50;
const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);
pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;
pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;
pub struct ConfirmationConfig {
pub tool_patterns: Vec<String>,
pub session_id: String,
pub register: RegisterConfirmation,
pub clear: ClearConfirmation,
}
pub struct WorkflowTurn {
pub workflow: ConversationWorkflow,
pub current_step_id: Option<String>,
}
pub struct TurnResult {
pub reply: String,
pub message_id: String,
pub invoked_knowledge_search: bool,
pub citations: Vec<Citation>,
pub usage: Option<crate::protocol::TurnUsage>,
pub next_step_id: Option<String>,
}
pub struct TurnRequest<'a> {
pub storage: Arc<dyn StorageAdapter>,
pub llm: LlmConfig,
pub max_iterations: u32,
pub conversation_id: &'a str,
pub request_id: &'a str,
pub user_message: &'a str,
pub access: AccessContext,
pub llm_provider: Option<Arc<dyn LlmProvider>>,
pub reranker: Option<Arc<dyn Reranker>>,
pub confirmation: Option<ConfirmationConfig>,
pub tool_provider: Option<Arc<dyn ToolProvider>>,
pub system_prompt: Option<String>,
pub org_id: Option<String>,
pub gateway_key: Option<String>,
pub workflow: Option<WorkflowTurn>,
pub judge: Option<Arc<dyn LlmProvider>>,
pub greeting_section: Option<String>,
pub enabled_tools: Option<Vec<String>>,
pub auth_gate: Option<AuthGateHook>,
pub tool_configs: Option<std::collections::HashMap<String, serde_json::Value>>,
}
pub async fn run_streaming_turn(
req: TurnRequest<'_>,
sink: &UnboundedSender<serde_json::Value>,
) -> Result<TurnResult> {
let TurnRequest {
storage,
llm,
max_iterations,
conversation_id,
request_id,
user_message,
access,
llm_provider,
reranker,
confirmation,
tool_provider,
system_prompt,
org_id,
gateway_key,
workflow,
judge,
greeting_section,
enabled_tools,
auth_gate,
tool_configs,
} = req;
let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
let auto_sources: Vec<KnowledgeResult> = knowledge
.query(user_message, AUTO_CONTEXT_LIMIT)
.unwrap_or_default();
let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
persist_message(
storage.as_ref(),
conversation_id,
Direction::Inbound,
user_message,
)
.await?;
let base_prompt = system_prompt
.as_deref()
.unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
let mut sections: Vec<String> = vec![base_prompt.to_string()];
if prior.is_empty() {
if let Some(greeting) = greeting_section.as_deref() {
sections.push(greeting.to_string());
}
}
if let Some(wt) = workflow.as_ref() {
sections.push(render_workflow_prompt_section(
&wt.workflow,
wt.current_step_id.as_deref(),
));
}
let resolved_prompt = sections.join("\n\n");
let config = AgentConfig::new("smooth-agent-chat", &resolved_prompt, llm)
.with_max_iterations(max_iterations)
.with_knowledge(Arc::clone(&knowledge))
.with_prior_messages(prior);
let mut tools = ToolRegistry::new();
let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
.with_result_sink(Arc::clone(&tool_sources));
if let Some(reranker) = reranker {
knowledge_search = knowledge_search.with_reranker(reranker);
}
tools.register(knowledge_search);
if let Some(provider) = tool_provider {
let mut ctx =
ToolProviderContext::new(org_id, access.clone()).with_conversation_id(conversation_id);
if let Some(key) = gateway_key {
ctx = ctx.with_gateway_key(key);
}
if let Some(configs) = tool_configs.clone() {
ctx = ctx.with_tool_configs(configs);
}
for tool in provider.tools_for(&ctx).await {
tools.register_arc(tool);
}
}
if let Some(enabled) = enabled_tools {
tools.retain(|name| enabled.iter().any(|id| id == name));
}
if let Some(gate) = auth_gate {
tools.add_hook(gate);
}
let confirmation_bridge = match &confirmation {
Some(cfg) if !cfg.tool_patterns.is_empty() => {
let pair = human_channel();
tools.add_hook(ConfirmationHook::new(
cfg.tool_patterns.clone(),
pair.request_tx,
pair.response_rx,
CONFIRMATION_TIMEOUT,
));
Some(spawn_confirmation_bridge(
pair.request_rx,
pair.response_tx,
sink.clone(),
request_id.to_string(),
cfg.session_id.clone(),
Arc::clone(&cfg.register),
))
}
_ => None,
};
let agent = {
let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
match llm_provider {
Some(provider) => agent.with_llm_provider(provider),
None => agent,
}
};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
let request_id_owned = request_id.to_string();
let sink_clone = sink.clone();
let translator = tokio::spawn(async move {
let mut invoked_knowledge_search = false;
let mut usage: Option<crate::protocol::TurnUsage> = None;
while let Some(event) = rx.recv().await {
match event {
AgentEvent::TokenDelta { content } => {
if !content.is_empty() {
let _ = sink_clone
.send(crate::protocol::stream_token(&request_id_owned, &content));
}
}
AgentEvent::ReasoningDelta { content } => {
if !content.is_empty() {
let _ = sink_clone.send(crate::protocol::stream_reasoning(
&request_id_owned,
&content,
));
}
}
AgentEvent::ToolCallStart {
tool_name,
arguments,
..
} => {
if tool_name == "knowledge_search" {
invoked_knowledge_search = true;
}
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&tool_name,
json!({
"rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
}),
));
}
AgentEvent::ToolCallComplete {
tool_name,
result,
is_error,
..
} => {
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&tool_name,
json!({
"rawResponse": json!({
"toolResult": { "name": tool_name, "isError": is_error, "result": result }
}),
}),
));
}
AgentEvent::PhaseStart { phase, .. } => {
let _ = sink_clone.send(crate::protocol::stream_chunk(
&request_id_owned,
&phase,
json!({}),
));
}
AgentEvent::Completed {
cost_usd,
prompt_tokens,
completion_tokens,
..
} => {
usage = Some(crate::protocol::TurnUsage {
cost_usd,
prompt_tokens,
completion_tokens,
});
}
_ => {}
}
}
(invoked_knowledge_search, usage)
});
let conversation = agent.run_with_channel(user_message, tx).await?;
drop(agent);
if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
let _ = handle.await;
(cfg.clear)(&cfg.session_id);
}
let (invoked_knowledge_search, usage) = translator.await.unwrap_or((false, None));
let reply = conversation
.last_assistant_content()
.unwrap_or_default()
.to_string();
let message_id = if reply.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
persist_message(
storage.as_ref(),
conversation_id,
Direction::Outbound,
&reply,
)
.await?
.id
};
let tool_sources = match Arc::try_unwrap(tool_sources) {
Ok(mutex) => mutex
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner),
Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
};
let citations = collect_citations(&auto_sources, &tool_sources);
let next_step_id = match workflow.as_ref() {
Some(wt) => Some(
judge_next_step(
judge.as_deref(),
&wt.workflow,
wt.current_step_id.as_deref(),
user_message,
&reply,
)
.await,
),
None => None,
};
Ok(TurnResult {
reply,
message_id,
invoked_knowledge_search,
citations,
usage,
next_step_id,
})
}
async fn judge_next_step(
judge: Option<&dyn LlmProvider>,
workflow: &ConversationWorkflow,
current_step_id: Option<&str>,
user_message: &str,
reply: &str,
) -> String {
let current = match resolve_current_step(workflow, current_step_id) {
Some(step) => step.clone(),
None => return current_step_id.unwrap_or_default().to_string(),
};
let stay = || current.id.clone();
if reply.trim().is_empty() {
return stay();
}
let Some(judge) = judge else {
return stay();
};
let system = EngineMessage::system(JUDGE_SYSTEM_PROMPT);
let user = EngineMessage::user(judge_user_prompt(workflow, ¤t, user_message, reply));
let verdict = match judge.chat(&[&system, &user], &[]).await {
Ok(resp) => WorkflowJudgeVerdict::parse(&resp.content),
Err(e) => {
tracing::warn!(error = %e, step = %current.id, "workflow judge failed; staying on current step");
WorkflowJudgeVerdict::Maybe
}
};
advance_after_verdict(workflow, Some(¤t.id), verdict).unwrap_or_else(stay)
}
fn spawn_confirmation_bridge(
mut request_rx: UnboundedReceiver<HumanRequest>,
response_tx: UnboundedSender<HumanResponse>,
sink: UnboundedSender<serde_json::Value>,
request_id: String,
session_id: String,
register: RegisterConfirmation,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(req) = request_rx.recv().await {
match req {
HumanRequest::Confirm {
tool_name, prompt, ..
} => {
register(&session_id, response_tx.clone());
let _ = sink.send(crate::protocol::write_confirmation_required(
&request_id,
&tool_name,
&prompt,
));
}
HumanRequest::Input { .. } => {
let _ = response_tx.send(HumanResponse::Denied {
reason: "free-form human input is not supported on this channel".into(),
});
}
}
}
})
}
fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
let mut seen = std::collections::HashSet::new();
auto.iter()
.chain(tool.iter())
.filter(|r| seen.insert(r.document_id.clone()))
.take(MAX_CITATIONS)
.map(Citation::from_knowledge_result)
.collect()
}
async fn load_prior_messages(
storage: &dyn StorageAdapter,
conversation_id: &str,
) -> Result<Vec<EngineMessage>> {
let page = storage
.list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
.await?;
let mut out = Vec::with_capacity(page.messages.len());
for m in page.messages {
let text = m
.content
.text
.clone()
.or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
.unwrap_or_default();
if text.is_empty() {
continue;
}
let role = match m.direction {
Direction::Inbound => Role::User,
Direction::Outbound => Role::Assistant,
};
out.push(EngineMessage {
id: m.id,
role,
content: text,
tool_call_id: None,
tool_name: None,
tool_calls: vec![],
reasoning_content: None,
timestamp: m.created_at,
});
}
Ok(out)
}
async fn persist_message(
storage: &dyn StorageAdapter,
conversation_id: &str,
direction: Direction,
text: &str,
) -> Result<DomainMessage> {
let now = chrono::Utc::now();
let message = DomainMessage {
id: uuid::Uuid::new_v4().to_string(),
external_id: None,
organization_id: None,
conversation_id: Some(conversation_id.to_string()),
direction,
content: MessageContent::from_text(text),
from: None,
to: None,
metadata_json: None,
analytics_json: None,
created_at: now,
updated_at: None,
};
storage.append_message(message).await
}
#[must_use]
pub fn general_agent_response(reply: &str) -> serde_json::Value {
json!({
"responseParts": [reply],
"customerHappinessScore": 0.5,
"needsSatisfactionScore": 0.5,
"requestSummary": "",
"resolutionStatus": "in_progress",
"suggestedNextActions": [],
})
}