smooth_operator_server/runner.rs
1//! The streaming, memory-carrying agent runner used by the WS service.
2//!
3//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
4//! path but (a) is non-streaming (`run_turn` returns only after the turn
5//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
6//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
7//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
8//!
9//! The service needs both, so this module builds the agent itself, wiring the
10//! same knowledge-grounding as core PLUS:
11//!
12//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
13//! [`AgentEvent`] stream into protocol events (`stream_token`,
14//! `stream_chunk`, `eventual_response`).
15//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
16//! each turn the session's persisted message log is loaded from the storage
17//! adapter and replayed into the conversation, so the model sees turn 1 when
18//! answering turn 2. (`Agent::new` randomizes the agent id every time, so the
19//! checkpoint-resume path can't be keyed stably — replaying the persisted log
20//! is the robust, backend-agnostic way to carry memory. The log is the source
21//! of truth the adapter already persists.)
22
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use anyhow::Result;
27use serde_json::json;
28use smooth_operator_core::llm_provider::LlmProvider;
29use smooth_operator_core::{
30 human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
31 KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
32};
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34
35use smooth_operator::access_control::AccessContext;
36use smooth_operator::adapter::{MessageQuery, StorageAdapter};
37use smooth_operator::agent_config::{
38 advance_after_verdict, judge_user_prompt, render_workflow_prompt_section, resolve_current_step,
39 AuthGateHook, ConversationWorkflow, WorkflowJudgeVerdict, JUDGE_SYSTEM_PROMPT,
40};
41use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
42use smooth_operator::rerank::Reranker;
43use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
44use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
45use smooth_operator::MAX_CITATIONS;
46
47/// How many auto-injected knowledge results the engine prepends as
48/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
49/// auto-injection (a top-3 query) so the citations we collect match the sources
50/// that grounded the first LLM call.
51const AUTO_CONTEXT_LIMIT: usize = 3;
52
53/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
54/// answers in the knowledge base and search it before answering anything
55/// organization-specific.
56const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
57 "You are a helpful customer-support agent for the organization. \
58 Answer the user's question accurately and concisely. When a question depends on \
59 organization-specific facts (policies, products, documentation), call the \
60 `knowledge_search` tool to retrieve them before answering, and ground your answer \
61 in what you retrieve. If the knowledge base has no relevant information, say so. \
62 Remember facts the user tells you within the conversation and use them when asked.";
63
64/// Max prior turns to replay into the conversation for memory. Bounds context
65/// growth on long sessions; the in-memory log is small but a real backend could
66/// be large.
67const MAX_PRIOR_MESSAGES: usize = 50;
68
69/// How long a parked write-tool confirmation waits for a `confirm_tool_action`
70/// before the core `ConfirmationHook` gives up and treats the tool as denied
71/// (a timeout). Bounds a stuck turn so a client that never confirms can't pin a
72/// task forever. Generous (5 min) because a human is in the loop.
73const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);
74
75/// Registers a parked turn's [`HumanResponse`] sender under a session id (so a
76/// later `confirm_tool_action` can take it). Typically `AppState::register_confirmation`.
77pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;
78
79/// Clears any registered confirmation sender for a session id when its turn ends.
80/// Typically `AppState::clear_confirmation`.
81pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;
82
83/// Hooks the runner needs to wire **write-confirmation HITL** into a turn
84/// without depending on `AppState` directly (keeps the runner unit-testable).
85///
86/// When `Some`, the runner installs a core [`ConfirmationHook`] over every tool
87/// whose name matches one of [`tool_patterns`](Self::tool_patterns). When such a
88/// tool is about to run, the agent loop **parks** inside the hook's `pre_call`
89/// and emits a [`HumanRequest::Confirm`]; the runner's bridge:
90/// 1. calls [`register`](Self::register) with the session's
91/// [`HumanResponse`] sender, so a later `confirm_tool_action` can resume,
92/// 2. emits a `confirm_tool_action_required` event through the turn sink.
93///
94/// On `confirm_tool_action`, the handler feeds the sender [`HumanResponse`] and
95/// the parked tool either executes (approved) or is skipped with a rejection
96/// result (denied). `None` (the default) installs no hook → no tool ever parks →
97/// behavior is byte-for-byte identical to before HITL.
98pub struct ConfirmationConfig {
99 /// Tool-name substrings that require human approval (matched by core's
100 /// `ConfirmationHook`, which uses `contains` matching). Empty disables HITL.
101 pub tool_patterns: Vec<String>,
102 /// The session this turn belongs to — carried on the
103 /// `confirm_tool_action_required` event and the registration key so the
104 /// inbound `confirm_tool_action` (keyed by `sessionId`) routes back here.
105 pub session_id: String,
106 /// Registers the parked turn's [`HumanResponse`] sender under
107 /// [`session_id`](Self::session_id) (typically `AppState::register_confirmation`).
108 pub register: RegisterConfirmation,
109 /// Clears any registered sender for [`session_id`](Self::session_id) when the
110 /// turn ends (typically `AppState::clear_confirmation`), so a stale sender
111 /// can't mis-route a later confirmation.
112 pub clear: ClearConfirmation,
113}
114
115/// A turn's **conversation-workflow** context: the agent's configured workflow
116/// plus the step the conversation is currently on. When present on a
117/// [`TurnRequest`], the runner injects the current step's intent/criteria into
118/// the system prompt and, after the turn, runs the judge to decide whether to
119/// advance. `None` (the default) means the agent runs freeform — no workflow
120/// section, no judge, byte-for-byte unchanged.
121pub struct WorkflowTurn {
122 /// The agent's structured workflow (goal + ordered steps).
123 pub workflow: ConversationWorkflow,
124 /// The step id the conversation is on, or `None` for a fresh start (the
125 /// runner then resolves to the workflow's first step).
126 pub current_step_id: Option<String>,
127}
128
129/// The terminal outcome of a streamed turn.
130pub struct TurnResult {
131 /// The agent's final natural-language reply.
132 pub reply: String,
133 /// The id of the persisted outbound (agent) message, for `eventual_response`.
134 pub message_id: String,
135 /// True if any `knowledge_search` tool call ran this turn (diagnostics).
136 pub invoked_knowledge_search: bool,
137 /// The sources that grounded this turn (the auto-injected context + every
138 /// `knowledge_search` result), deduped by id and capped. Carried onto the
139 /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
140 pub citations: Vec<Citation>,
141 /// The turn's token-accounting + cost, captured from the engine's terminal
142 /// [`AgentEvent::Completed`]. Carried onto the `eventual_response`'s `usage`
143 /// object so clients accumulate live session cost. `None` when the engine
144 /// reported no `Completed` event (e.g. an offline mock turn).
145 pub usage: Option<crate::protocol::TurnUsage>,
146 /// The conversation-workflow step id **after** this turn's judge ran. `Some`
147 /// only when the turn had a [`WorkflowTurn`]; the caller persists it onto the
148 /// session so the next turn resumes on the right step. Equals the incoming
149 /// step when the judge did not advance (criteria not met, terminal step, or a
150 /// judge failure — never freezes, never crashes the turn).
151 pub next_step_id: Option<String>,
152}
153
154/// Everything one streaming turn needs. Bundled into a struct so the call sites
155/// (the reference server's `handle_send_message` and the lambda's
156/// `send_message`) stay readable and the security-critical [`access`](Self::access)
157/// field can never be silently dropped from a positional argument list.
158pub struct TurnRequest<'a> {
159 /// The storage seam (conversations / messages / sessions / knowledge).
160 pub storage: Arc<dyn StorageAdapter>,
161 /// The resolved LLM config for this turn.
162 pub llm: LlmConfig,
163 /// Agent-loop iteration cap.
164 pub max_iterations: u32,
165 /// The conversation this turn belongs to.
166 pub conversation_id: &'a str,
167 /// The protocol request id (streaming correlation).
168 pub request_id: &'a str,
169 /// The inbound user message.
170 pub user_message: &'a str,
171 /// **The requester's document-level entitlements.** Retrieval (the
172 /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
173 /// tool) reads through `storage.knowledge_for_access(&access)`, so a
174 /// restricted document is never surfaced to a requester who lacks the
175 /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
176 /// (fail closed for ACL'd content).
177 pub access: AccessContext,
178 /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
179 /// deterministically offline. `None` in production (a live client is built
180 /// from `llm`).
181 pub llm_provider: Option<Arc<dyn LlmProvider>>,
182 /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
183 /// `knowledge_search` tool overfetches candidates and reorders the top-K with
184 /// this reranker before they reach the model. `None` (the default) keeps the
185 /// retrieval order unchanged, so default behavior is byte-for-byte the same.
186 /// Selected by [`build_reranker`](crate::reranker::build_reranker).
187 pub reranker: Option<Arc<dyn Reranker>>,
188 /// Optional **write-confirmation HITL** wiring. `None` (the default) installs
189 /// no confirmation hook, so no tool ever parks the turn and behavior is
190 /// identical to before HITL. `Some` installs a core [`ConfirmationHook`] over
191 /// the configured tool patterns and bridges its [`HumanRequest`]s to a
192 /// `confirm_tool_action_required` event + a registered resumable sender. See
193 /// [`ConfirmationConfig`].
194 pub confirmation: Option<ConfirmationConfig>,
195 /// **SEAM 1 — host tool injection.** When `Some`, the runner asks this
196 /// provider for EXTRA tools and merges them into the turn's
197 /// [`ToolRegistry`] alongside the built-ins. `None` (the default) leaves the
198 /// registry as exactly the built-ins, so default behavior is byte-for-byte
199 /// unchanged. A host installs one via [`AppState::with_tools`](crate::state::AppState::with_tools).
200 pub tool_provider: Option<Arc<dyn ToolProvider>>,
201 /// **SEAM 2 — per-org agent persona.** The resolved system prompt for this
202 /// turn. When `Some`, it REPLACES the built-in [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`]
203 /// as the agent's system prompt (the host resolves it from per-org settings,
204 /// e.g. [`AgentSettings::persona`](smooth_operator::settings::AgentSettings::persona)).
205 /// `None` (the default) keeps the const prompt, so default behavior is
206 /// byte-for-byte unchanged.
207 pub system_prompt: Option<String>,
208 /// The owning org for this turn, threaded into the
209 /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
210 /// so a [`ToolProvider`] can return per-org tools. `None` when no org is
211 /// resolved (e.g. an anonymous reference-server connection).
212 pub org_id: Option<String>,
213 /// The resolved per-org LLM-gateway key for this turn, threaded into the
214 /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
215 /// so a retrieval-style host tool (e.g. agent-brain's `knowledge_search`)
216 /// can call the same gateway this turn was billed/scoped to. `None` when no
217 /// key resolved (e.g. a mock-driven offline turn). The runner does not use
218 /// it to talk to the gateway itself — that comes from [`llm`](Self::llm); it
219 /// only carries it through to the provider context.
220 pub gateway_key: Option<String>,
221 /// **Per-agent conversation workflow.** When `Some`, the runner injects the
222 /// current step's intent/criteria into the system prompt and runs the judge
223 /// after the turn to decide advancement. `None` (the default) ⇒ no workflow
224 /// section, no judge — freeform behavior, byte-for-byte unchanged.
225 pub workflow: Option<WorkflowTurn>,
226 /// The **judge** LLM surface: a cheap model that decides whether the current
227 /// workflow step's criteria were met this turn. Only consulted when
228 /// [`workflow`](Self::workflow) is `Some`. Production wires a client built
229 /// from the server's default (cheap) model; tests inject a mock. `None` ⇒ the
230 /// workflow stays on its current step (never advances) — a safe degrade.
231 pub judge: Option<Arc<dyn LlmProvider>>,
232 /// Per-agent first-turn greeting section (already rendered). Injected into the
233 /// system prompt ONLY when this conversation has no prior messages, so the
234 /// agent opens with it once. `None` ⇒ no greeting.
235 pub greeting_section: Option<String>,
236 /// Per-agent tool allow-list (snake_case ids). `Some` restricts the turn's
237 /// registry to those tools (built-ins + host tools alike); `None` ⇒ the full
238 /// tool set (unchanged). Unknown ids simply match nothing.
239 pub enabled_tools: Option<Vec<String>>,
240 /// Per-agent auth-level gate (SMOODEV-590). When `Some`, installed as a
241 /// `ToolHook` that blocks a call whose configured `authLevel` isn't satisfied
242 /// (admin on public, or unverified end_user on public). `None` ⇒ no gate.
243 pub auth_gate: Option<AuthGateHook>,
244 /// Per-tool config (`tool_id` → config), delivered to host tools via the
245 /// `ToolProviderContext`. `None`/empty ⇒ no per-tool config. Built-ins ignore it.
246 pub tool_configs: Option<std::collections::HashMap<String, serde_json::Value>>,
247}
248
249/// Runs one knowledge-grounded, streaming turn for a session's conversation and
250/// emits protocol-shaped events through `sink` as they happen.
251///
252/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
253/// [`crate::protocol`]). The caller forwards them over the WebSocket.
254///
255/// ## Access control (security-critical)
256///
257/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
258/// context and the agent's `knowledge_search` tool — read through
259/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
260/// so a document the requester is not entitled to (e.g. a private-repo doc
261/// scoped to a group the requester is not in) is dropped before it can reach the
262/// model or a citation. See `docs/ACCESS-CONTROL.md`.
263///
264/// # Errors
265/// Returns an error if message persistence or the agent loop fails fatally. The
266/// caller converts this into a protocol `error` event.
267pub async fn run_streaming_turn(
268 req: TurnRequest<'_>,
269 sink: &UnboundedSender<serde_json::Value>,
270) -> Result<TurnResult> {
271 let TurnRequest {
272 storage,
273 llm,
274 max_iterations,
275 conversation_id,
276 request_id,
277 user_message,
278 access,
279 llm_provider,
280 reranker,
281 confirmation,
282 tool_provider,
283 system_prompt,
284 org_id,
285 gateway_key,
286 workflow,
287 judge,
288 greeting_section,
289 enabled_tools,
290 auth_gate,
291 tool_configs,
292 } = req;
293
294 // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
295 // Built once from the requester's `AccessContext` so the auto-injected
296 // context query, the agent's `knowledge_search` tool, and the citation
297 // mirror all hit the SAME filtered view — a restricted doc can't leak in
298 // through one path while being dropped on another.
299 let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
300
301 // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
302 // citations include the sources the FIRST LLM call was grounded with.
303 // Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
304 // against the same ACL-filtered handle. Best-effort: a KB error yields no
305 // auto-context citations.
306 let auto_sources: Vec<KnowledgeResult> = knowledge
307 .query(user_message, AUTO_CONTEXT_LIMIT)
308 .unwrap_or_default();
309 // Sink the knowledge_search tool records its structured results into, for
310 // citations built from the sources the agent's searches surfaced.
311 let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
312
313 // 1. Load prior turns for memory BEFORE persisting the new inbound message,
314 // so prior_messages is exactly the history-up-to-now.
315 let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
316
317 // 2. Persist the inbound user message.
318 persist_message(
319 storage.as_ref(),
320 conversation_id,
321 Direction::Inbound,
322 user_message,
323 )
324 .await?;
325
326 // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
327 // SAME ACL-filtered handle) + replayed prior messages for memory.
328 //
329 // SEAM 2 — resolve the system prompt: a host-supplied persona
330 // (`system_prompt`, resolved per-agent then per-org) overrides the
331 // built-in const; absent ⇒ the const, so default behavior is byte-for-byte
332 // unchanged. When the agent has a conversation workflow, the current
333 // step's intent/criteria are appended so the model drives that step.
334 let base_prompt = system_prompt
335 .as_deref()
336 .unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
337 // Compose base → first-turn greeting → current workflow step. The greeting is
338 // injected only when this conversation has no prior messages (first turn).
339 let mut sections: Vec<String> = vec![base_prompt.to_string()];
340 if prior.is_empty() {
341 if let Some(greeting) = greeting_section.as_deref() {
342 sections.push(greeting.to_string());
343 }
344 }
345 if let Some(wt) = workflow.as_ref() {
346 sections.push(render_workflow_prompt_section(
347 &wt.workflow,
348 wt.current_step_id.as_deref(),
349 ));
350 }
351 let resolved_prompt = sections.join("\n\n");
352 let config = AgentConfig::new("smooth-agent-chat", &resolved_prompt, llm)
353 .with_max_iterations(max_iterations)
354 .with_knowledge(Arc::clone(&knowledge))
355 .with_prior_messages(prior);
356
357 let mut tools = ToolRegistry::new();
358 // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
359 // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
360 // stage. With `None` (the default) the tool fetches exactly `limit` and
361 // returns the retrieval order unchanged.
362 let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
363 .with_result_sink(Arc::clone(&tool_sources));
364 if let Some(reranker) = reranker {
365 knowledge_search = knowledge_search.with_reranker(reranker);
366 }
367 tools.register(knowledge_search);
368
369 // SEAM 1 — merge host-contributed tools. When a provider is installed, ask
370 // it (with the turn's org + access context) for extra tools and register
371 // each alongside the built-ins. Built-ins are registered FIRST, so a host
372 // tool that intentionally reuses a built-in name replaces it; a distinct
373 // name simply adds. With no provider this block is a no-op, leaving the
374 // registry as exactly today's built-ins.
375 if let Some(provider) = tool_provider {
376 // Thread the per-turn handles the runner already has — the conversation
377 // this turn runs in and the resolved per-org gateway key — so a host's
378 // conversation-persisting / retrieval tools aren't degraded to no-ops.
379 let mut ctx =
380 ToolProviderContext::new(org_id, access.clone()).with_conversation_id(conversation_id);
381 if let Some(key) = gateway_key {
382 ctx = ctx.with_gateway_key(key);
383 }
384 // SEAM 3 — deliver per-tool config to host tools (registry.ts parity).
385 if let Some(configs) = tool_configs.clone() {
386 ctx = ctx.with_tool_configs(configs);
387 }
388 for tool in provider.tools_for(&ctx).await {
389 tools.register_arc(tool);
390 }
391 }
392
393 // SEAM 3 — per-agent tool allow-list. When the agent's `tool_config` restricts
394 // the tool set, drop every registered tool (built-in or host) whose snake_case
395 // name isn't enabled. `None` (empty/absent tool_config) leaves the full set.
396 if let Some(enabled) = enabled_tools {
397 tools.retain(|name| enabled.iter().any(|id| id == name));
398 }
399
400 // SEAM 3 — per-agent authLevel gate. When installed, a tool call whose
401 // configured `authLevel` isn't satisfied is blocked at execution with the
402 // reference refusal (the engine surfaces the `pre_call` error to the model).
403 // `None` ⇒ no gate.
404 if let Some(gate) = auth_gate {
405 tools.add_hook(gate);
406 }
407
408 // 3a. Write-confirmation HITL: when configured with tool patterns, install a
409 // core `ConfirmationHook` over those tools and spawn a bridge that turns
410 // each `HumanRequest::Confirm` into a `confirm_tool_action_required`
411 // event + a registered resumable `HumanResponse` sender. With no
412 // `confirmation` (the default) or empty patterns, no hook is installed —
413 // no tool parks the turn, byte-for-byte unchanged from before HITL.
414 let confirmation_bridge = match &confirmation {
415 Some(cfg) if !cfg.tool_patterns.is_empty() => {
416 let pair = human_channel();
417 // The hook owns the request *sender* (emits Confirm) and the response
418 // *receiver* (awaits the human's verdict). The runner keeps the
419 // request *receiver* and the response *sender* for the bridge.
420 tools.add_hook(ConfirmationHook::new(
421 cfg.tool_patterns.clone(),
422 pair.request_tx,
423 pair.response_rx,
424 CONFIRMATION_TIMEOUT,
425 ));
426 Some(spawn_confirmation_bridge(
427 pair.request_rx,
428 pair.response_tx,
429 sink.clone(),
430 request_id.to_string(),
431 cfg.session_id.clone(),
432 Arc::clone(&cfg.register),
433 ))
434 }
435 _ => None,
436 };
437
438 let agent = {
439 let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
440 // Inject the mock LLM provider for offline/deterministic tests; in
441 // production a live client is built from `llm`.
442 match llm_provider {
443 Some(provider) => agent.with_llm_provider(provider),
444 None => agent,
445 }
446 };
447
448 // 4. Run with the streaming channel and translate events as they arrive.
449 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
450 let request_id_owned = request_id.to_string();
451 let sink_clone = sink.clone();
452
453 // Spawn the event translator so we forward tokens to the client in real
454 // time while the agent loop runs concurrently.
455 let translator = tokio::spawn(async move {
456 let mut invoked_knowledge_search = false;
457 // The terminal `Completed` event carries the turn's accumulated cost +
458 // token counts; capture them to surface on the `eventual_response`.
459 let mut usage: Option<crate::protocol::TurnUsage> = None;
460 while let Some(event) = rx.recv().await {
461 match event {
462 AgentEvent::TokenDelta { content } => {
463 if !content.is_empty() {
464 let _ = sink_clone
465 .send(crate::protocol::stream_token(&request_id_owned, &content));
466 }
467 }
468 AgentEvent::ReasoningDelta { content } => {
469 // Reasoning rides its own protocol message so the client shows
470 // it as "thinking", never as the answer (th-4d8682).
471 if !content.is_empty() {
472 let _ = sink_clone.send(crate::protocol::stream_reasoning(
473 &request_id_owned,
474 &content,
475 ));
476 }
477 }
478 AgentEvent::ToolCallStart {
479 tool_name,
480 arguments,
481 ..
482 } => {
483 if tool_name == "knowledge_search" {
484 invoked_knowledge_search = true;
485 }
486 let _ = sink_clone.send(crate::protocol::stream_chunk(
487 &request_id_owned,
488 &tool_name,
489 json!({
490 "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
491 }),
492 ));
493 }
494 AgentEvent::ToolCallComplete {
495 tool_name,
496 result,
497 is_error,
498 ..
499 } => {
500 let _ = sink_clone.send(crate::protocol::stream_chunk(
501 &request_id_owned,
502 &tool_name,
503 json!({
504 "rawResponse": json!({
505 "toolResult": { "name": tool_name, "isError": is_error, "result": result }
506 }),
507 }),
508 ));
509 }
510 AgentEvent::PhaseStart { phase, .. } => {
511 let _ = sink_clone.send(crate::protocol::stream_chunk(
512 &request_id_owned,
513 &phase,
514 json!({}),
515 ));
516 }
517 // The terminal `Completed` event is NOT re-emitted as a stream
518 // event (the protocol carries the turn outcome on the
519 // `eventual_response`), but we capture its accumulated cost +
520 // token counts to attach to that terminal event's `usage`.
521 AgentEvent::Completed {
522 cost_usd,
523 prompt_tokens,
524 completion_tokens,
525 ..
526 } => {
527 usage = Some(crate::protocol::TurnUsage {
528 cost_usd,
529 prompt_tokens,
530 completion_tokens,
531 });
532 }
533 // Other Started / token-accounting events are terminal or
534 // structural; the protocol carries those via immediate/eventual
535 // responses, so they're intentionally not re-emitted here.
536 _ => {}
537 }
538 }
539 (invoked_knowledge_search, usage)
540 });
541
542 // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
543 // the channel closes and the translator task drains and finishes.
544 let conversation = agent.run_with_channel(user_message, tx).await?;
545
546 // The turn is over: tear down the confirmation bridge. `run_with_channel`
547 // borrows `&self`, so the agent (and the `ConfirmationHook` it owns via the
548 // tool registry) is STILL alive here — and the hook holds the bridge's
549 // request *sender*. Dropping the agent closes that sender, which is what
550 // lets the bridge's `request_rx.recv()` return `None` and the task finish.
551 // Without this explicit drop, awaiting the bridge below would hang forever.
552 drop(agent);
553 if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
554 let _ = handle.await;
555 (cfg.clear)(&cfg.session_id);
556 }
557
558 let (invoked_knowledge_search, usage) = translator.await.unwrap_or((false, None));
559
560 let reply = conversation
561 .last_assistant_content()
562 .unwrap_or_default()
563 .to_string();
564
565 // 5. Persist the outbound reply and capture its id for eventual_response.
566 let message_id = if reply.is_empty() {
567 uuid::Uuid::new_v4().to_string()
568 } else {
569 persist_message(
570 storage.as_ref(),
571 conversation_id,
572 Direction::Outbound,
573 &reply,
574 )
575 .await?
576 .id
577 };
578
579 // Build citations from the sources that grounded this turn: auto-injected
580 // context first (it grounded the first LLM call), then the agent's
581 // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
582 let tool_sources = match Arc::try_unwrap(tool_sources) {
583 Ok(mutex) => mutex
584 .into_inner()
585 .unwrap_or_else(std::sync::PoisonError::into_inner),
586 Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
587 };
588 let citations = collect_citations(&auto_sources, &tool_sources);
589
590 // 6. Conversation-workflow advancement (SMOODEV-590 parity). When the agent
591 // has a workflow, ask the cheap judge whether THIS turn satisfied the
592 // current step's criteria and compute the next step id. Failure-tolerant:
593 // no judge, an empty reply, or a judge error all keep the current step —
594 // the conversation never freezes and the turn never fails on the judge.
595 let next_step_id = match workflow.as_ref() {
596 Some(wt) => Some(
597 judge_next_step(
598 judge.as_deref(),
599 &wt.workflow,
600 wt.current_step_id.as_deref(),
601 user_message,
602 &reply,
603 )
604 .await,
605 ),
606 None => None,
607 };
608
609 Ok(TurnResult {
610 reply,
611 message_id,
612 invoked_knowledge_search,
613 citations,
614 usage,
615 next_step_id,
616 })
617}
618
619/// Run the workflow judge for one turn and return the step id to resume on.
620///
621/// Mirrors `nodes/workflow-judge.ts`: a cheap yes/no/maybe verdict on whether the
622/// current step's criteria were met, advancing only on `yes`. Every failure mode
623/// keeps the current step (never freezes, never advances on ambiguity):
624/// - no judge provider, an empty agent reply, or a judge LLM error → stay put,
625/// - an unrecognized verdict → `Maybe` → stay put.
626async fn judge_next_step(
627 judge: Option<&dyn LlmProvider>,
628 workflow: &ConversationWorkflow,
629 current_step_id: Option<&str>,
630 user_message: &str,
631 reply: &str,
632) -> String {
633 let current = match resolve_current_step(workflow, current_step_id) {
634 Some(step) => step.clone(),
635 // Empty workflow (shouldn't happen — the provider drops empty-steps
636 // workflows) → echo the incoming pointer or empty.
637 None => return current_step_id.unwrap_or_default().to_string(),
638 };
639
640 // Nothing to judge without a reply or a judge surface → stay on the step.
641 let stay = || current.id.clone();
642 if reply.trim().is_empty() {
643 return stay();
644 }
645 let Some(judge) = judge else {
646 return stay();
647 };
648
649 let system = EngineMessage::system(JUDGE_SYSTEM_PROMPT);
650 let user = EngineMessage::user(judge_user_prompt(workflow, ¤t, user_message, reply));
651 let verdict = match judge.chat(&[&system, &user], &[]).await {
652 Ok(resp) => WorkflowJudgeVerdict::parse(&resp.content),
653 Err(e) => {
654 tracing::warn!(error = %e, step = %current.id, "workflow judge failed; staying on current step");
655 WorkflowJudgeVerdict::Maybe
656 }
657 };
658
659 advance_after_verdict(workflow, Some(¤t.id), verdict).unwrap_or_else(stay)
660}
661
662/// Spawn the **confirmation bridge** for a turn that has a `ConfirmationHook`
663/// installed. The bridge owns the request *receiver* (each item is a
664/// [`HumanRequest::Confirm`] the hook emitted when a write tool is about to run)
665/// and the response *sender* (the hook awaits the verdict on its paired
666/// receiver). For every confirm request it:
667/// 1. registers `response_tx` under `session_id` via `register`, so an inbound
668/// `confirm_tool_action` can take it and feed the verdict back, and
669/// 2. emits a `write_confirmation_required` event through the turn `sink`,
670/// parking the turn until the client confirms.
671///
672/// The `tool_name` is used as the event's opaque `toolId`: core's
673/// `HumanRequest::Confirm` doesn't carry the LLM's tool-call id, but a turn only
674/// parks one write tool at a time (the loop blocks inside `pre_call`), so the
675/// tool name is a stable, sufficient correlation key for the resume. The bridge
676/// loops until the request channel closes (the hook/agent dropped at turn end),
677/// then returns — letting the caller clear the registration.
678fn spawn_confirmation_bridge(
679 mut request_rx: UnboundedReceiver<HumanRequest>,
680 response_tx: UnboundedSender<HumanResponse>,
681 sink: UnboundedSender<serde_json::Value>,
682 request_id: String,
683 session_id: String,
684 register: RegisterConfirmation,
685) -> tokio::task::JoinHandle<()> {
686 tokio::spawn(async move {
687 while let Some(req) = request_rx.recv().await {
688 match req {
689 HumanRequest::Confirm {
690 tool_name, prompt, ..
691 } => {
692 // Register THIS turn's response sender so the next
693 // `confirm_tool_action` for this session resumes it. Re-clone
694 // per request: the hook takes one verdict per parked tool.
695 register(&session_id, response_tx.clone());
696 // Per spec the event carries a `requestId` (correlation), an
697 // opaque `toolId` (the tool name — one tool parks at a time),
698 // and the human-readable `actionDescription` (the hook prompt).
699 let _ = sink.send(crate::protocol::write_confirmation_required(
700 &request_id,
701 &tool_name,
702 &prompt,
703 ));
704 }
705 // The chat HITL path only emits `Confirm`; a free-form `Input`
706 // request has no chat affordance, so auto-decline it rather than
707 // hang the turn (keeps the loop live for the next confirm).
708 HumanRequest::Input { .. } => {
709 let _ = response_tx.send(HumanResponse::Denied {
710 reason: "free-form human input is not supported on this channel".into(),
711 });
712 }
713 }
714 }
715 })
716}
717
718/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
719/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
720/// by the runner) followed by everything the agent's `knowledge_search` calls
721/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
722/// (first occurrence wins), mapped to [`Citation`], and capped at
723/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
724fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
725 let mut seen = std::collections::HashSet::new();
726 auto.iter()
727 .chain(tool.iter())
728 .filter(|r| seen.insert(r.document_id.clone()))
729 .take(MAX_CITATIONS)
730 .map(Citation::from_knowledge_result)
731 .collect()
732}
733
734/// Load the conversation's persisted messages (oldest-first, capped) and convert
735/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
736async fn load_prior_messages(
737 storage: &dyn StorageAdapter,
738 conversation_id: &str,
739) -> Result<Vec<EngineMessage>> {
740 let page = storage
741 .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
742 .await?;
743
744 let mut out = Vec::with_capacity(page.messages.len());
745 for m in page.messages {
746 let text = m
747 .content
748 .text
749 .clone()
750 .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
751 .unwrap_or_default();
752 if text.is_empty() {
753 continue;
754 }
755 let role = match m.direction {
756 Direction::Inbound => Role::User,
757 Direction::Outbound => Role::Assistant,
758 };
759 out.push(EngineMessage {
760 id: m.id,
761 role,
762 content: text,
763 tool_call_id: None,
764 tool_name: None,
765 tool_calls: vec![],
766 reasoning_content: None,
767 timestamp: m.created_at,
768 });
769 }
770 Ok(out)
771}
772
773/// Append a single message to the conversation's log via the adapter.
774async fn persist_message(
775 storage: &dyn StorageAdapter,
776 conversation_id: &str,
777 direction: Direction,
778 text: &str,
779) -> Result<DomainMessage> {
780 let now = chrono::Utc::now();
781 let message = DomainMessage {
782 id: uuid::Uuid::new_v4().to_string(),
783 external_id: None,
784 organization_id: None,
785 conversation_id: Some(conversation_id.to_string()),
786 direction,
787 content: MessageContent::from_text(text),
788 from: None,
789 to: None,
790 metadata_json: None,
791 analytics_json: None,
792 created_at: now,
793 updated_at: None,
794 };
795 storage.append_message(message).await
796}
797
798/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
799/// `eventual_response` carries. The reference runtime doesn't produce the full
800/// structured analytics, so we surface the reply text in `responseParts` and
801/// supply neutral defaults for the analytic fields (clients render
802/// `responseParts`).
803#[must_use]
804pub fn general_agent_response(reply: &str) -> serde_json::Value {
805 json!({
806 "responseParts": [reply],
807 "customerHappinessScore": 0.5,
808 "needsSatisfactionScore": 0.5,
809 "requestSummary": "",
810 "resolutionStatus": "in_progress",
811 "suggestedNextActions": [],
812 })
813}