Skip to main content

smooth_operator_server/
runner.rs

1//! The streaming, memory-carrying agent runner used by the WS service.
2//!
3//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
4//! path but (a) is non-streaming (`run_turn` returns only after the turn
5//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
6//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
7//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
8//!
9//! The service needs both, so this module builds the agent itself, wiring the
10//! same knowledge-grounding as core PLUS:
11//!
12//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
13//!    [`AgentEvent`] stream into protocol events (`stream_token`,
14//!    `stream_chunk`, `eventual_response`).
15//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
16//!    each turn the session's persisted message log is loaded from the storage
17//!    adapter and replayed into the conversation, so the model sees turn 1 when
18//!    answering turn 2. (`Agent::new` randomizes the agent id every time, so the
19//!    checkpoint-resume path can't be keyed stably — replaying the persisted log
20//!    is the robust, backend-agnostic way to carry memory. The log is the source
21//!    of truth the adapter already persists.)
22
23use std::sync::{Arc, Mutex};
24
25use anyhow::Result;
26use serde_json::json;
27use smooth_operator_core::llm_provider::LlmProvider;
28use smooth_operator_core::{
29    Agent, AgentConfig, AgentEvent, KnowledgeBase, KnowledgeResult, LlmConfig,
30    Message as EngineMessage, Role, ToolRegistry,
31};
32use tokio::sync::mpsc::UnboundedSender;
33
34use smooth_operator::access_control::AccessContext;
35use smooth_operator::adapter::{MessageQuery, StorageAdapter};
36use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
37use smooth_operator::rerank::Reranker;
38use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
39use smooth_operator::MAX_CITATIONS;
40
41/// How many auto-injected knowledge results the engine prepends as
42/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
43/// auto-injection (a top-3 query) so the citations we collect match the sources
44/// that grounded the first LLM call.
45const AUTO_CONTEXT_LIMIT: usize = 3;
46
47/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
48/// answers in the knowledge base and search it before answering anything
49/// organization-specific.
50const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
51    "You are a helpful customer-support agent for the organization. \
52    Answer the user's question accurately and concisely. When a question depends on \
53    organization-specific facts (policies, products, documentation), call the \
54    `knowledge_search` tool to retrieve them before answering, and ground your answer \
55    in what you retrieve. If the knowledge base has no relevant information, say so. \
56    Remember facts the user tells you within the conversation and use them when asked.";
57
58/// Max prior turns to replay into the conversation for memory. Bounds context
59/// growth on long sessions; the in-memory log is small but a real backend could
60/// be large.
61const MAX_PRIOR_MESSAGES: usize = 50;
62
63/// The terminal outcome of a streamed turn.
64pub struct TurnResult {
65    /// The agent's final natural-language reply.
66    pub reply: String,
67    /// The id of the persisted outbound (agent) message, for `eventual_response`.
68    pub message_id: String,
69    /// True if any `knowledge_search` tool call ran this turn (diagnostics).
70    pub invoked_knowledge_search: bool,
71    /// The sources that grounded this turn (the auto-injected context + every
72    /// `knowledge_search` result), deduped by id and capped. Carried onto the
73    /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
74    pub citations: Vec<Citation>,
75}
76
77/// Everything one streaming turn needs. Bundled into a struct so the call sites
78/// (the reference server's `handle_send_message` and the lambda's
79/// `send_message`) stay readable and the security-critical [`access`](Self::access)
80/// field can never be silently dropped from a positional argument list.
81pub struct TurnRequest<'a> {
82    /// The storage seam (conversations / messages / sessions / knowledge).
83    pub storage: Arc<dyn StorageAdapter>,
84    /// The resolved LLM config for this turn.
85    pub llm: LlmConfig,
86    /// Agent-loop iteration cap.
87    pub max_iterations: u32,
88    /// The conversation this turn belongs to.
89    pub conversation_id: &'a str,
90    /// The protocol request id (streaming correlation).
91    pub request_id: &'a str,
92    /// The inbound user message.
93    pub user_message: &'a str,
94    /// **The requester's document-level entitlements.** Retrieval (the
95    /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
96    /// tool) reads through `storage.knowledge_for_access(&access)`, so a
97    /// restricted document is never surfaced to a requester who lacks the
98    /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
99    /// (fail closed for ACL'd content).
100    pub access: AccessContext,
101    /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
102    /// deterministically offline. `None` in production (a live client is built
103    /// from `llm`).
104    pub llm_provider: Option<Arc<dyn LlmProvider>>,
105    /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
106    /// `knowledge_search` tool overfetches candidates and reorders the top-K with
107    /// this reranker before they reach the model. `None` (the default) keeps the
108    /// retrieval order unchanged, so default behavior is byte-for-byte the same.
109    /// Selected by [`build_reranker`](crate::reranker::build_reranker).
110    pub reranker: Option<Arc<dyn Reranker>>,
111}
112
113/// Runs one knowledge-grounded, streaming turn for a session's conversation and
114/// emits protocol-shaped events through `sink` as they happen.
115///
116/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
117/// [`crate::protocol`]). The caller forwards them over the WebSocket.
118///
119/// ## Access control (security-critical)
120///
121/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
122/// context and the agent's `knowledge_search` tool — read through
123/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
124/// so a document the requester is not entitled to (e.g. a private-repo doc
125/// scoped to a group the requester is not in) is dropped before it can reach the
126/// model or a citation. See `docs/ACCESS-CONTROL.md`.
127///
128/// # Errors
129/// Returns an error if message persistence or the agent loop fails fatally. The
130/// caller converts this into a protocol `error` event.
131pub async fn run_streaming_turn(
132    req: TurnRequest<'_>,
133    sink: &UnboundedSender<serde_json::Value>,
134) -> Result<TurnResult> {
135    let TurnRequest {
136        storage,
137        llm,
138        max_iterations,
139        conversation_id,
140        request_id,
141        user_message,
142        access,
143        llm_provider,
144        reranker,
145    } = req;
146
147    // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
148    // Built once from the requester's `AccessContext` so the auto-injected
149    // context query, the agent's `knowledge_search` tool, and the citation
150    // mirror all hit the SAME filtered view — a restricted doc can't leak in
151    // through one path while being dropped on another.
152    let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
153
154    // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
155    //    citations include the sources the FIRST LLM call was grounded with.
156    //    Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
157    //    against the same ACL-filtered handle. Best-effort: a KB error yields no
158    //    auto-context citations.
159    let auto_sources: Vec<KnowledgeResult> = knowledge
160        .query(user_message, AUTO_CONTEXT_LIMIT)
161        .unwrap_or_default();
162    // Sink the knowledge_search tool records its structured results into, for
163    // citations built from the sources the agent's searches surfaced.
164    let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
165
166    // 1. Load prior turns for memory BEFORE persisting the new inbound message,
167    //    so prior_messages is exactly the history-up-to-now.
168    let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
169
170    // 2. Persist the inbound user message.
171    persist_message(
172        storage.as_ref(),
173        conversation_id,
174        Direction::Inbound,
175        user_message,
176    )
177    .await?;
178
179    // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
180    //    SAME ACL-filtered handle) + replayed prior messages for memory.
181    let config = AgentConfig::new("smooth-agent-chat", KNOWLEDGE_CHAT_SYSTEM_PROMPT, llm)
182        .with_max_iterations(max_iterations)
183        .with_knowledge(Arc::clone(&knowledge))
184        .with_prior_messages(prior);
185
186    let mut tools = ToolRegistry::new();
187    // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
188    // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
189    // stage. With `None` (the default) the tool fetches exactly `limit` and
190    // returns the retrieval order unchanged.
191    let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
192        .with_result_sink(Arc::clone(&tool_sources));
193    if let Some(reranker) = reranker {
194        knowledge_search = knowledge_search.with_reranker(reranker);
195    }
196    tools.register(knowledge_search);
197
198    let agent = {
199        let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
200        // Inject the mock LLM provider for offline/deterministic tests; in
201        // production a live client is built from `llm`.
202        match llm_provider {
203            Some(provider) => agent.with_llm_provider(provider),
204            None => agent,
205        }
206    };
207
208    // 4. Run with the streaming channel and translate events as they arrive.
209    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
210    let request_id_owned = request_id.to_string();
211    let sink_clone = sink.clone();
212
213    // Spawn the event translator so we forward tokens to the client in real
214    // time while the agent loop runs concurrently.
215    let translator = tokio::spawn(async move {
216        let mut invoked_knowledge_search = false;
217        while let Some(event) = rx.recv().await {
218            match event {
219                AgentEvent::TokenDelta { content } => {
220                    if !content.is_empty() {
221                        let _ = sink_clone
222                            .send(crate::protocol::stream_token(&request_id_owned, &content));
223                    }
224                }
225                AgentEvent::ToolCallStart {
226                    tool_name,
227                    arguments,
228                    ..
229                } => {
230                    if tool_name == "knowledge_search" {
231                        invoked_knowledge_search = true;
232                    }
233                    let _ = sink_clone.send(crate::protocol::stream_chunk(
234                        &request_id_owned,
235                        &tool_name,
236                        json!({
237                            "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
238                        }),
239                    ));
240                }
241                AgentEvent::ToolCallComplete {
242                    tool_name,
243                    result,
244                    is_error,
245                    ..
246                } => {
247                    let _ = sink_clone.send(crate::protocol::stream_chunk(
248                        &request_id_owned,
249                        &tool_name,
250                        json!({
251                            "rawResponse": json!({
252                                "toolResult": { "name": tool_name, "isError": is_error, "result": result }
253                            }),
254                        }),
255                    ));
256                }
257                AgentEvent::PhaseStart { phase, .. } => {
258                    let _ = sink_clone.send(crate::protocol::stream_chunk(
259                        &request_id_owned,
260                        &phase,
261                        json!({}),
262                    ));
263                }
264                // Started / Completed / token-accounting events are terminal or
265                // structural; the protocol carries those via immediate/eventual
266                // responses, so they're intentionally not re-emitted here.
267                _ => {}
268            }
269        }
270        invoked_knowledge_search
271    });
272
273    // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
274    // the channel closes and the translator task drains and finishes.
275    let conversation = agent.run_with_channel(user_message, tx).await?;
276
277    let invoked_knowledge_search = translator.await.unwrap_or(false);
278
279    let reply = conversation
280        .last_assistant_content()
281        .unwrap_or_default()
282        .to_string();
283
284    // 5. Persist the outbound reply and capture its id for eventual_response.
285    let message_id = if reply.is_empty() {
286        uuid::Uuid::new_v4().to_string()
287    } else {
288        persist_message(
289            storage.as_ref(),
290            conversation_id,
291            Direction::Outbound,
292            &reply,
293        )
294        .await?
295        .id
296    };
297
298    // Build citations from the sources that grounded this turn: auto-injected
299    // context first (it grounded the first LLM call), then the agent's
300    // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
301    let tool_sources = match Arc::try_unwrap(tool_sources) {
302        Ok(mutex) => mutex
303            .into_inner()
304            .unwrap_or_else(std::sync::PoisonError::into_inner),
305        Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
306    };
307    let citations = collect_citations(&auto_sources, &tool_sources);
308
309    Ok(TurnResult {
310        reply,
311        message_id,
312        invoked_knowledge_search,
313        citations,
314    })
315}
316
317/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
318/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
319/// by the runner) followed by everything the agent's `knowledge_search` calls
320/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
321/// (first occurrence wins), mapped to [`Citation`], and capped at
322/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
323fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
324    let mut seen = std::collections::HashSet::new();
325    auto.iter()
326        .chain(tool.iter())
327        .filter(|r| seen.insert(r.document_id.clone()))
328        .take(MAX_CITATIONS)
329        .map(Citation::from_knowledge_result)
330        .collect()
331}
332
333/// Load the conversation's persisted messages (oldest-first, capped) and convert
334/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
335async fn load_prior_messages(
336    storage: &dyn StorageAdapter,
337    conversation_id: &str,
338) -> Result<Vec<EngineMessage>> {
339    let page = storage
340        .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
341        .await?;
342
343    let mut out = Vec::with_capacity(page.messages.len());
344    for m in page.messages {
345        let text = m
346            .content
347            .text
348            .clone()
349            .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
350            .unwrap_or_default();
351        if text.is_empty() {
352            continue;
353        }
354        let role = match m.direction {
355            Direction::Inbound => Role::User,
356            Direction::Outbound => Role::Assistant,
357        };
358        out.push(EngineMessage {
359            id: m.id,
360            role,
361            content: text,
362            tool_call_id: None,
363            tool_name: None,
364            tool_calls: vec![],
365            reasoning_content: None,
366            timestamp: m.created_at,
367        });
368    }
369    Ok(out)
370}
371
372/// Append a single message to the conversation's log via the adapter.
373async fn persist_message(
374    storage: &dyn StorageAdapter,
375    conversation_id: &str,
376    direction: Direction,
377    text: &str,
378) -> Result<DomainMessage> {
379    let now = chrono::Utc::now();
380    let message = DomainMessage {
381        id: uuid::Uuid::new_v4().to_string(),
382        external_id: None,
383        organization_id: None,
384        conversation_id: Some(conversation_id.to_string()),
385        direction,
386        content: MessageContent::from_text(text),
387        from: None,
388        to: None,
389        metadata_json: None,
390        analytics_json: None,
391        created_at: now,
392        updated_at: None,
393    };
394    storage.append_message(message).await
395}
396
397/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
398/// `eventual_response` carries. The reference runtime doesn't produce the full
399/// structured analytics, so we surface the reply text in `responseParts` and
400/// supply neutral defaults for the analytic fields (clients render
401/// `responseParts`).
402#[must_use]
403pub fn general_agent_response(reply: &str) -> serde_json::Value {
404    json!({
405        "responseParts": [reply],
406        "customerHappinessScore": 0.5,
407        "needsSatisfactionScore": 0.5,
408        "requestSummary": "",
409        "resolutionStatus": "in_progress",
410        "suggestedNextActions": [],
411    })
412}