smooth_operator_server/runner.rs
1//! The streaming, memory-carrying agent runner used by the WS service.
2//!
3//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
4//! path but (a) is non-streaming (`run_turn` returns only after the turn
5//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
6//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
7//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
8//!
9//! The service needs both, so this module builds the agent itself, wiring the
10//! same knowledge-grounding as core PLUS:
11//!
12//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
13//! [`AgentEvent`] stream into protocol events (`stream_token`,
14//! `stream_chunk`, `eventual_response`).
15//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
16//! each turn the session's persisted message log is loaded from the storage
17//! adapter and replayed into the conversation, so the model sees turn 1 when
18//! answering turn 2. (`Agent::new` randomizes the agent id every time, so the
19//! checkpoint-resume path can't be keyed stably — replaying the persisted log
20//! is the robust, backend-agnostic way to carry memory. The log is the source
21//! of truth the adapter already persists.)
22
23use std::sync::{Arc, Mutex};
24
25use anyhow::Result;
26use serde_json::json;
27use smooth_operator_core::llm_provider::LlmProvider;
28use smooth_operator_core::{
29 Agent, AgentConfig, AgentEvent, KnowledgeBase, KnowledgeResult, LlmConfig,
30 Message as EngineMessage, Role, ToolRegistry,
31};
32use tokio::sync::mpsc::UnboundedSender;
33
34use smooth_operator::access_control::AccessContext;
35use smooth_operator::adapter::{MessageQuery, StorageAdapter};
36use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
37use smooth_operator::rerank::Reranker;
38use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
39use smooth_operator::MAX_CITATIONS;
40
41/// How many auto-injected knowledge results the engine prepends as
42/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
43/// auto-injection (a top-3 query) so the citations we collect match the sources
44/// that grounded the first LLM call.
45const AUTO_CONTEXT_LIMIT: usize = 3;
46
47/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
48/// answers in the knowledge base and search it before answering anything
49/// organization-specific.
50const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
51 "You are a helpful customer-support agent for the organization. \
52 Answer the user's question accurately and concisely. When a question depends on \
53 organization-specific facts (policies, products, documentation), call the \
54 `knowledge_search` tool to retrieve them before answering, and ground your answer \
55 in what you retrieve. If the knowledge base has no relevant information, say so. \
56 Remember facts the user tells you within the conversation and use them when asked.";
57
58/// Max prior turns to replay into the conversation for memory. Bounds context
59/// growth on long sessions; the in-memory log is small but a real backend could
60/// be large.
61const MAX_PRIOR_MESSAGES: usize = 50;
62
63/// The terminal outcome of a streamed turn.
64pub struct TurnResult {
65 /// The agent's final natural-language reply.
66 pub reply: String,
67 /// The id of the persisted outbound (agent) message, for `eventual_response`.
68 pub message_id: String,
69 /// True if any `knowledge_search` tool call ran this turn (diagnostics).
70 pub invoked_knowledge_search: bool,
71 /// The sources that grounded this turn (the auto-injected context + every
72 /// `knowledge_search` result), deduped by id and capped. Carried onto the
73 /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
74 pub citations: Vec<Citation>,
75}
76
77/// Everything one streaming turn needs. Bundled into a struct so the call sites
78/// (the reference server's `handle_send_message` and the lambda's
79/// `send_message`) stay readable and the security-critical [`access`](Self::access)
80/// field can never be silently dropped from a positional argument list.
81pub struct TurnRequest<'a> {
82 /// The storage seam (conversations / messages / sessions / knowledge).
83 pub storage: Arc<dyn StorageAdapter>,
84 /// The resolved LLM config for this turn.
85 pub llm: LlmConfig,
86 /// Agent-loop iteration cap.
87 pub max_iterations: u32,
88 /// The conversation this turn belongs to.
89 pub conversation_id: &'a str,
90 /// The protocol request id (streaming correlation).
91 pub request_id: &'a str,
92 /// The inbound user message.
93 pub user_message: &'a str,
94 /// **The requester's document-level entitlements.** Retrieval (the
95 /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
96 /// tool) reads through `storage.knowledge_for_access(&access)`, so a
97 /// restricted document is never surfaced to a requester who lacks the
98 /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
99 /// (fail closed for ACL'd content).
100 pub access: AccessContext,
101 /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
102 /// deterministically offline. `None` in production (a live client is built
103 /// from `llm`).
104 pub llm_provider: Option<Arc<dyn LlmProvider>>,
105 /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
106 /// `knowledge_search` tool overfetches candidates and reorders the top-K with
107 /// this reranker before they reach the model. `None` (the default) keeps the
108 /// retrieval order unchanged, so default behavior is byte-for-byte the same.
109 /// Selected by [`build_reranker`](crate::reranker::build_reranker).
110 pub reranker: Option<Arc<dyn Reranker>>,
111}
112
113/// Runs one knowledge-grounded, streaming turn for a session's conversation and
114/// emits protocol-shaped events through `sink` as they happen.
115///
116/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
117/// [`crate::protocol`]). The caller forwards them over the WebSocket.
118///
119/// ## Access control (security-critical)
120///
121/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
122/// context and the agent's `knowledge_search` tool — read through
123/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
124/// so a document the requester is not entitled to (e.g. a private-repo doc
125/// scoped to a group the requester is not in) is dropped before it can reach the
126/// model or a citation. See `docs/ACCESS-CONTROL.md`.
127///
128/// # Errors
129/// Returns an error if message persistence or the agent loop fails fatally. The
130/// caller converts this into a protocol `error` event.
131pub async fn run_streaming_turn(
132 req: TurnRequest<'_>,
133 sink: &UnboundedSender<serde_json::Value>,
134) -> Result<TurnResult> {
135 let TurnRequest {
136 storage,
137 llm,
138 max_iterations,
139 conversation_id,
140 request_id,
141 user_message,
142 access,
143 llm_provider,
144 reranker,
145 } = req;
146
147 // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
148 // Built once from the requester's `AccessContext` so the auto-injected
149 // context query, the agent's `knowledge_search` tool, and the citation
150 // mirror all hit the SAME filtered view — a restricted doc can't leak in
151 // through one path while being dropped on another.
152 let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
153
154 // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
155 // citations include the sources the FIRST LLM call was grounded with.
156 // Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
157 // against the same ACL-filtered handle. Best-effort: a KB error yields no
158 // auto-context citations.
159 let auto_sources: Vec<KnowledgeResult> = knowledge
160 .query(user_message, AUTO_CONTEXT_LIMIT)
161 .unwrap_or_default();
162 // Sink the knowledge_search tool records its structured results into, for
163 // citations built from the sources the agent's searches surfaced.
164 let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
165
166 // 1. Load prior turns for memory BEFORE persisting the new inbound message,
167 // so prior_messages is exactly the history-up-to-now.
168 let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
169
170 // 2. Persist the inbound user message.
171 persist_message(
172 storage.as_ref(),
173 conversation_id,
174 Direction::Inbound,
175 user_message,
176 )
177 .await?;
178
179 // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
180 // SAME ACL-filtered handle) + replayed prior messages for memory.
181 let config = AgentConfig::new("smooth-agent-chat", KNOWLEDGE_CHAT_SYSTEM_PROMPT, llm)
182 .with_max_iterations(max_iterations)
183 .with_knowledge(Arc::clone(&knowledge))
184 .with_prior_messages(prior);
185
186 let mut tools = ToolRegistry::new();
187 // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
188 // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
189 // stage. With `None` (the default) the tool fetches exactly `limit` and
190 // returns the retrieval order unchanged.
191 let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
192 .with_result_sink(Arc::clone(&tool_sources));
193 if let Some(reranker) = reranker {
194 knowledge_search = knowledge_search.with_reranker(reranker);
195 }
196 tools.register(knowledge_search);
197
198 let agent = {
199 let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
200 // Inject the mock LLM provider for offline/deterministic tests; in
201 // production a live client is built from `llm`.
202 match llm_provider {
203 Some(provider) => agent.with_llm_provider(provider),
204 None => agent,
205 }
206 };
207
208 // 4. Run with the streaming channel and translate events as they arrive.
209 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
210 let request_id_owned = request_id.to_string();
211 let sink_clone = sink.clone();
212
213 // Spawn the event translator so we forward tokens to the client in real
214 // time while the agent loop runs concurrently.
215 let translator = tokio::spawn(async move {
216 let mut invoked_knowledge_search = false;
217 while let Some(event) = rx.recv().await {
218 match event {
219 AgentEvent::TokenDelta { content } => {
220 if !content.is_empty() {
221 let _ = sink_clone
222 .send(crate::protocol::stream_token(&request_id_owned, &content));
223 }
224 }
225 AgentEvent::ToolCallStart {
226 tool_name,
227 arguments,
228 ..
229 } => {
230 if tool_name == "knowledge_search" {
231 invoked_knowledge_search = true;
232 }
233 let _ = sink_clone.send(crate::protocol::stream_chunk(
234 &request_id_owned,
235 &tool_name,
236 json!({
237 "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
238 }),
239 ));
240 }
241 AgentEvent::ToolCallComplete {
242 tool_name,
243 result,
244 is_error,
245 ..
246 } => {
247 let _ = sink_clone.send(crate::protocol::stream_chunk(
248 &request_id_owned,
249 &tool_name,
250 json!({
251 "rawResponse": json!({
252 "toolResult": { "name": tool_name, "isError": is_error, "result": result }
253 }),
254 }),
255 ));
256 }
257 AgentEvent::PhaseStart { phase, .. } => {
258 let _ = sink_clone.send(crate::protocol::stream_chunk(
259 &request_id_owned,
260 &phase,
261 json!({}),
262 ));
263 }
264 // Started / Completed / token-accounting events are terminal or
265 // structural; the protocol carries those via immediate/eventual
266 // responses, so they're intentionally not re-emitted here.
267 _ => {}
268 }
269 }
270 invoked_knowledge_search
271 });
272
273 // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
274 // the channel closes and the translator task drains and finishes.
275 let conversation = agent.run_with_channel(user_message, tx).await?;
276
277 let invoked_knowledge_search = translator.await.unwrap_or(false);
278
279 let reply = conversation
280 .last_assistant_content()
281 .unwrap_or_default()
282 .to_string();
283
284 // 5. Persist the outbound reply and capture its id for eventual_response.
285 let message_id = if reply.is_empty() {
286 uuid::Uuid::new_v4().to_string()
287 } else {
288 persist_message(
289 storage.as_ref(),
290 conversation_id,
291 Direction::Outbound,
292 &reply,
293 )
294 .await?
295 .id
296 };
297
298 // Build citations from the sources that grounded this turn: auto-injected
299 // context first (it grounded the first LLM call), then the agent's
300 // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
301 let tool_sources = match Arc::try_unwrap(tool_sources) {
302 Ok(mutex) => mutex
303 .into_inner()
304 .unwrap_or_else(std::sync::PoisonError::into_inner),
305 Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
306 };
307 let citations = collect_citations(&auto_sources, &tool_sources);
308
309 Ok(TurnResult {
310 reply,
311 message_id,
312 invoked_knowledge_search,
313 citations,
314 })
315}
316
317/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
318/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
319/// by the runner) followed by everything the agent's `knowledge_search` calls
320/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
321/// (first occurrence wins), mapped to [`Citation`], and capped at
322/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
323fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
324 let mut seen = std::collections::HashSet::new();
325 auto.iter()
326 .chain(tool.iter())
327 .filter(|r| seen.insert(r.document_id.clone()))
328 .take(MAX_CITATIONS)
329 .map(Citation::from_knowledge_result)
330 .collect()
331}
332
333/// Load the conversation's persisted messages (oldest-first, capped) and convert
334/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
335async fn load_prior_messages(
336 storage: &dyn StorageAdapter,
337 conversation_id: &str,
338) -> Result<Vec<EngineMessage>> {
339 let page = storage
340 .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
341 .await?;
342
343 let mut out = Vec::with_capacity(page.messages.len());
344 for m in page.messages {
345 let text = m
346 .content
347 .text
348 .clone()
349 .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
350 .unwrap_or_default();
351 if text.is_empty() {
352 continue;
353 }
354 let role = match m.direction {
355 Direction::Inbound => Role::User,
356 Direction::Outbound => Role::Assistant,
357 };
358 out.push(EngineMessage {
359 id: m.id,
360 role,
361 content: text,
362 tool_call_id: None,
363 tool_name: None,
364 tool_calls: vec![],
365 reasoning_content: None,
366 timestamp: m.created_at,
367 });
368 }
369 Ok(out)
370}
371
372/// Append a single message to the conversation's log via the adapter.
373async fn persist_message(
374 storage: &dyn StorageAdapter,
375 conversation_id: &str,
376 direction: Direction,
377 text: &str,
378) -> Result<DomainMessage> {
379 let now = chrono::Utc::now();
380 let message = DomainMessage {
381 id: uuid::Uuid::new_v4().to_string(),
382 external_id: None,
383 organization_id: None,
384 conversation_id: Some(conversation_id.to_string()),
385 direction,
386 content: MessageContent::from_text(text),
387 from: None,
388 to: None,
389 metadata_json: None,
390 analytics_json: None,
391 created_at: now,
392 updated_at: None,
393 };
394 storage.append_message(message).await
395}
396
397/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
398/// `eventual_response` carries. The reference runtime doesn't produce the full
399/// structured analytics, so we surface the reply text in `responseParts` and
400/// supply neutral defaults for the analytic fields (clients render
401/// `responseParts`).
402#[must_use]
403pub fn general_agent_response(reply: &str) -> serde_json::Value {
404 json!({
405 "responseParts": [reply],
406 "customerHappinessScore": 0.5,
407 "needsSatisfactionScore": 0.5,
408 "requestSummary": "",
409 "resolutionStatus": "in_progress",
410 "suggestedNextActions": [],
411 })
412}