Skip to main content

smooth_operator_server/
runner.rs

1//! The streaming, memory-carrying agent runner used by the WS service.
2//!
3//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
4//! path but (a) is non-streaming (`run_turn` returns only after the turn
5//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
6//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
7//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
8//!
9//! The service needs both, so this module builds the agent itself, wiring the
10//! same knowledge-grounding as core PLUS:
11//!
12//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
13//!    [`AgentEvent`] stream into protocol events (`stream_token`,
14//!    `stream_chunk`, `eventual_response`).
15//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
16//!    each turn the session's persisted message log is loaded from the storage
17//!    adapter and replayed into the conversation, so the model sees turn 1 when
18//!    answering turn 2. (`Agent::new` randomizes the agent id every time, so the
19//!    checkpoint-resume path can't be keyed stably — replaying the persisted log
20//!    is the robust, backend-agnostic way to carry memory. The log is the source
21//!    of truth the adapter already persists.)
22
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use anyhow::Result;
27use serde_json::json;
28use smooth_operator_core::llm_provider::LlmProvider;
29use smooth_operator_core::{
30    human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
31    KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
32};
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34
35use smooth_operator::access_control::AccessContext;
36use smooth_operator::adapter::{MessageQuery, StorageAdapter};
37use smooth_operator::agent_config::{
38    advance_after_verdict, judge_user_prompt, render_workflow_prompt_section, resolve_current_step,
39    AuthGateHook, ConversationWorkflow, WorkflowJudgeVerdict, JUDGE_SYSTEM_PROMPT,
40};
41use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
42use smooth_operator::rerank::Reranker;
43use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
44use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
45use smooth_operator::MAX_CITATIONS;
46
47/// How many auto-injected knowledge results the engine prepends as
48/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
49/// auto-injection (a top-3 query) so the citations we collect match the sources
50/// that grounded the first LLM call.
51const AUTO_CONTEXT_LIMIT: usize = 3;
52
53/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
54/// answers in the knowledge base and search it before answering anything
55/// organization-specific.
56const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
57    "You are a helpful customer-support agent for the organization. \
58    Answer the user's question accurately and concisely. When a question depends on \
59    organization-specific facts (policies, products, documentation), call the \
60    `knowledge_search` tool to retrieve them before answering, and ground your answer \
61    in what you retrieve. If the knowledge base has no relevant information, say so. \
62    Remember facts the user tells you within the conversation and use them when asked.";
63
64/// Max prior turns to replay into the conversation for memory. Bounds context
65/// growth on long sessions; the in-memory log is small but a real backend could
66/// be large.
67const MAX_PRIOR_MESSAGES: usize = 50;
68
69/// How long a parked write-tool confirmation waits for a `confirm_tool_action`
70/// before the core `ConfirmationHook` gives up and treats the tool as denied
71/// (a timeout). Bounds a stuck turn so a client that never confirms can't pin a
72/// task forever. Generous (5 min) because a human is in the loop.
73const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);
74
75/// Registers a parked turn's [`HumanResponse`] sender under a session id (so a
76/// later `confirm_tool_action` can take it). Typically `AppState::register_confirmation`.
77pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;
78
79/// Clears any registered confirmation sender for a session id when its turn ends.
80/// Typically `AppState::clear_confirmation`.
81pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;
82
83/// Hooks the runner needs to wire **write-confirmation HITL** into a turn
84/// without depending on `AppState` directly (keeps the runner unit-testable).
85///
86/// When `Some`, the runner installs a core [`ConfirmationHook`] over every tool
87/// whose name matches one of [`tool_patterns`](Self::tool_patterns). When such a
88/// tool is about to run, the agent loop **parks** inside the hook's `pre_call`
89/// and emits a [`HumanRequest::Confirm`]; the runner's bridge:
90///   1. calls [`register`](Self::register) with the session's
91///      [`HumanResponse`] sender, so a later `confirm_tool_action` can resume,
92///   2. emits a `confirm_tool_action_required` event through the turn sink.
93///
94/// On `confirm_tool_action`, the handler feeds the sender [`HumanResponse`] and
95/// the parked tool either executes (approved) or is skipped with a rejection
96/// result (denied). `None` (the default) installs no hook → no tool ever parks →
97/// behavior is byte-for-byte identical to before HITL.
98pub struct ConfirmationConfig {
99    /// Tool-name substrings that require human approval (matched by core's
100    /// `ConfirmationHook`, which uses `contains` matching). Empty disables HITL.
101    pub tool_patterns: Vec<String>,
102    /// The session this turn belongs to — carried on the
103    /// `confirm_tool_action_required` event and the registration key so the
104    /// inbound `confirm_tool_action` (keyed by `sessionId`) routes back here.
105    pub session_id: String,
106    /// Registers the parked turn's [`HumanResponse`] sender under
107    /// [`session_id`](Self::session_id) (typically `AppState::register_confirmation`).
108    pub register: RegisterConfirmation,
109    /// Clears any registered sender for [`session_id`](Self::session_id) when the
110    /// turn ends (typically `AppState::clear_confirmation`), so a stale sender
111    /// can't mis-route a later confirmation.
112    pub clear: ClearConfirmation,
113}
114
115/// A turn's **conversation-workflow** context: the agent's configured workflow
116/// plus the step the conversation is currently on. When present on a
117/// [`TurnRequest`], the runner injects the current step's intent/criteria into
118/// the system prompt and, after the turn, runs the judge to decide whether to
119/// advance. `None` (the default) means the agent runs freeform — no workflow
120/// section, no judge, byte-for-byte unchanged.
121pub struct WorkflowTurn {
122    /// The agent's structured workflow (goal + ordered steps).
123    pub workflow: ConversationWorkflow,
124    /// The step id the conversation is on, or `None` for a fresh start (the
125    /// runner then resolves to the workflow's first step).
126    pub current_step_id: Option<String>,
127}
128
129/// The terminal outcome of a streamed turn.
130pub struct TurnResult {
131    /// The agent's final natural-language reply.
132    pub reply: String,
133    /// The id of the persisted outbound (agent) message, for `eventual_response`.
134    pub message_id: String,
135    /// True if any `knowledge_search` tool call ran this turn (diagnostics).
136    pub invoked_knowledge_search: bool,
137    /// The sources that grounded this turn (the auto-injected context + every
138    /// `knowledge_search` result), deduped by id and capped. Carried onto the
139    /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
140    pub citations: Vec<Citation>,
141    /// The turn's token-accounting + cost, captured from the engine's terminal
142    /// [`AgentEvent::Completed`]. Carried onto the `eventual_response`'s `usage`
143    /// object so clients accumulate live session cost. `None` when the engine
144    /// reported no `Completed` event (e.g. an offline mock turn).
145    pub usage: Option<crate::protocol::TurnUsage>,
146    /// The conversation-workflow step id **after** this turn's judge ran. `Some`
147    /// only when the turn had a [`WorkflowTurn`]; the caller persists it onto the
148    /// session so the next turn resumes on the right step. Equals the incoming
149    /// step when the judge did not advance (criteria not met, terminal step, or a
150    /// judge failure — never freezes, never crashes the turn).
151    pub next_step_id: Option<String>,
152}
153
154/// Everything one streaming turn needs. Bundled into a struct so the call sites
155/// (the reference server's `handle_send_message` and the lambda's
156/// `send_message`) stay readable and the security-critical [`access`](Self::access)
157/// field can never be silently dropped from a positional argument list.
158pub struct TurnRequest<'a> {
159    /// The storage seam (conversations / messages / sessions / knowledge).
160    pub storage: Arc<dyn StorageAdapter>,
161    /// The resolved LLM config for this turn.
162    pub llm: LlmConfig,
163    /// Agent-loop iteration cap.
164    pub max_iterations: u32,
165    /// The conversation this turn belongs to.
166    pub conversation_id: &'a str,
167    /// The protocol request id (streaming correlation).
168    pub request_id: &'a str,
169    /// The inbound user message.
170    pub user_message: &'a str,
171    /// **The requester's document-level entitlements.** Retrieval (the
172    /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
173    /// tool) reads through `storage.knowledge_for_access(&access)`, so a
174    /// restricted document is never surfaced to a requester who lacks the
175    /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
176    /// (fail closed for ACL'd content).
177    pub access: AccessContext,
178    /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
179    /// deterministically offline. `None` in production (a live client is built
180    /// from `llm`).
181    pub llm_provider: Option<Arc<dyn LlmProvider>>,
182    /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
183    /// `knowledge_search` tool overfetches candidates and reorders the top-K with
184    /// this reranker before they reach the model. `None` (the default) keeps the
185    /// retrieval order unchanged, so default behavior is byte-for-byte the same.
186    /// Selected by [`build_reranker`](crate::reranker::build_reranker).
187    pub reranker: Option<Arc<dyn Reranker>>,
188    /// Optional **write-confirmation HITL** wiring. `None` (the default) installs
189    /// no confirmation hook, so no tool ever parks the turn and behavior is
190    /// identical to before HITL. `Some` installs a core [`ConfirmationHook`] over
191    /// the configured tool patterns and bridges its [`HumanRequest`]s to a
192    /// `confirm_tool_action_required` event + a registered resumable sender. See
193    /// [`ConfirmationConfig`].
194    pub confirmation: Option<ConfirmationConfig>,
195    /// **SEAM 1 — host tool injection.** When `Some`, the runner asks this
196    /// provider for EXTRA tools and merges them into the turn's
197    /// [`ToolRegistry`] alongside the built-ins. `None` (the default) leaves the
198    /// registry as exactly the built-ins, so default behavior is byte-for-byte
199    /// unchanged. A host installs one via [`AppState::with_tools`](crate::state::AppState::with_tools).
200    pub tool_provider: Option<Arc<dyn ToolProvider>>,
201    /// **SEAM 2 — per-org agent persona.** The resolved system prompt for this
202    /// turn. When `Some`, it REPLACES the built-in [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`]
203    /// as the agent's system prompt (the host resolves it from per-org settings,
204    /// e.g. [`AgentSettings::persona`](smooth_operator::settings::AgentSettings::persona)).
205    /// `None` (the default) keeps the const prompt, so default behavior is
206    /// byte-for-byte unchanged.
207    pub system_prompt: Option<String>,
208    /// The owning org for this turn, threaded into the
209    /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
210    /// so a [`ToolProvider`] can return per-org tools. `None` when no org is
211    /// resolved (e.g. an anonymous reference-server connection).
212    pub org_id: Option<String>,
213    /// The resolved per-org LLM-gateway key for this turn, threaded into the
214    /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
215    /// so a retrieval-style host tool (e.g. agent-brain's `knowledge_search`)
216    /// can call the same gateway this turn was billed/scoped to. `None` when no
217    /// key resolved (e.g. a mock-driven offline turn). The runner does not use
218    /// it to talk to the gateway itself — that comes from [`llm`](Self::llm); it
219    /// only carries it through to the provider context.
220    pub gateway_key: Option<String>,
221    /// **Per-agent conversation workflow.** When `Some`, the runner injects the
222    /// current step's intent/criteria into the system prompt and runs the judge
223    /// after the turn to decide advancement. `None` (the default) ⇒ no workflow
224    /// section, no judge — freeform behavior, byte-for-byte unchanged.
225    pub workflow: Option<WorkflowTurn>,
226    /// The **judge** LLM surface: a cheap model that decides whether the current
227    /// workflow step's criteria were met this turn. Only consulted when
228    /// [`workflow`](Self::workflow) is `Some`. Production wires a client built
229    /// from the server's default (cheap) model; tests inject a mock. `None` ⇒ the
230    /// workflow stays on its current step (never advances) — a safe degrade.
231    pub judge: Option<Arc<dyn LlmProvider>>,
232    /// Per-agent first-turn greeting section (already rendered). Injected into the
233    /// system prompt ONLY when this conversation has no prior messages, so the
234    /// agent opens with it once. `None` ⇒ no greeting.
235    pub greeting_section: Option<String>,
236    /// Per-agent tool allow-list (snake_case ids). `Some` restricts the turn's
237    /// registry to those tools (built-ins + host tools alike); `None` ⇒ the full
238    /// tool set (unchanged). Unknown ids simply match nothing.
239    pub enabled_tools: Option<Vec<String>>,
240    /// Per-agent auth-level gate (SMOODEV-590). When `Some`, installed as a
241    /// `ToolHook` that blocks a call whose configured `authLevel` isn't satisfied
242    /// (admin on public, or unverified end_user on public). `None` ⇒ no gate.
243    pub auth_gate: Option<AuthGateHook>,
244    /// Per-tool config (`tool_id` → config), delivered to host tools via the
245    /// `ToolProviderContext`. `None`/empty ⇒ no per-tool config. Built-ins ignore it.
246    pub tool_configs: Option<std::collections::HashMap<String, serde_json::Value>>,
247}
248
249/// Runs one knowledge-grounded, streaming turn for a session's conversation and
250/// emits protocol-shaped events through `sink` as they happen.
251///
252/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
253/// [`crate::protocol`]). The caller forwards them over the WebSocket.
254///
255/// ## Access control (security-critical)
256///
257/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
258/// context and the agent's `knowledge_search` tool — read through
259/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
260/// so a document the requester is not entitled to (e.g. a private-repo doc
261/// scoped to a group the requester is not in) is dropped before it can reach the
262/// model or a citation. See `docs/ACCESS-CONTROL.md`.
263///
264/// # Errors
265/// Returns an error if message persistence or the agent loop fails fatally. The
266/// caller converts this into a protocol `error` event.
267pub async fn run_streaming_turn(
268    req: TurnRequest<'_>,
269    sink: &UnboundedSender<serde_json::Value>,
270) -> Result<TurnResult> {
271    let TurnRequest {
272        storage,
273        llm,
274        max_iterations,
275        conversation_id,
276        request_id,
277        user_message,
278        access,
279        llm_provider,
280        reranker,
281        confirmation,
282        tool_provider,
283        system_prompt,
284        org_id,
285        gateway_key,
286        workflow,
287        judge,
288        greeting_section,
289        enabled_tools,
290        auth_gate,
291        tool_configs,
292    } = req;
293
294    // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
295    // Built once from the requester's `AccessContext` so the auto-injected
296    // context query, the agent's `knowledge_search` tool, and the citation
297    // mirror all hit the SAME filtered view — a restricted doc can't leak in
298    // through one path while being dropped on another.
299    let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
300
301    // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
302    //    citations include the sources the FIRST LLM call was grounded with.
303    //    Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
304    //    against the same ACL-filtered handle. Best-effort: a KB error yields no
305    //    auto-context citations.
306    let auto_sources: Vec<KnowledgeResult> = knowledge
307        .query(user_message, AUTO_CONTEXT_LIMIT)
308        .unwrap_or_default();
309    // Sink the knowledge_search tool records its structured results into, for
310    // citations built from the sources the agent's searches surfaced.
311    let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
312
313    // 1. Load prior turns for memory BEFORE persisting the new inbound message,
314    //    so prior_messages is exactly the history-up-to-now.
315    let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
316
317    // 2. Persist the inbound user message.
318    persist_message(
319        storage.as_ref(),
320        conversation_id,
321        Direction::Inbound,
322        user_message,
323    )
324    .await?;
325
326    // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
327    //    SAME ACL-filtered handle) + replayed prior messages for memory.
328    //
329    //    SEAM 2 — resolve the system prompt: a host-supplied persona
330    //    (`system_prompt`, resolved per-agent then per-org) overrides the
331    //    built-in const; absent ⇒ the const, so default behavior is byte-for-byte
332    //    unchanged. When the agent has a conversation workflow, the current
333    //    step's intent/criteria are appended so the model drives that step.
334    let base_prompt = system_prompt
335        .as_deref()
336        .unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
337    // Compose base → first-turn greeting → current workflow step. The greeting is
338    // injected only when this conversation has no prior messages (first turn).
339    let mut sections: Vec<String> = vec![base_prompt.to_string()];
340    if prior.is_empty() {
341        if let Some(greeting) = greeting_section.as_deref() {
342            sections.push(greeting.to_string());
343        }
344    }
345    if let Some(wt) = workflow.as_ref() {
346        sections.push(render_workflow_prompt_section(
347            &wt.workflow,
348            wt.current_step_id.as_deref(),
349        ));
350    }
351    let resolved_prompt = sections.join("\n\n");
352    let config = AgentConfig::new("smooth-agent-chat", &resolved_prompt, llm)
353        .with_max_iterations(max_iterations)
354        .with_knowledge(Arc::clone(&knowledge))
355        .with_prior_messages(prior);
356
357    let mut tools = ToolRegistry::new();
358    // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
359    // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
360    // stage. With `None` (the default) the tool fetches exactly `limit` and
361    // returns the retrieval order unchanged.
362    let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
363        .with_result_sink(Arc::clone(&tool_sources));
364    if let Some(reranker) = reranker {
365        knowledge_search = knowledge_search.with_reranker(reranker);
366    }
367    tools.register(knowledge_search);
368
369    // SEAM 1 — merge host-contributed tools. When a provider is installed, ask
370    // it (with the turn's org + access context) for extra tools and register
371    // each alongside the built-ins. Built-ins are registered FIRST, so a host
372    // tool that intentionally reuses a built-in name replaces it; a distinct
373    // name simply adds. With no provider this block is a no-op, leaving the
374    // registry as exactly today's built-ins.
375    if let Some(provider) = tool_provider {
376        // Thread the per-turn handles the runner already has — the conversation
377        // this turn runs in and the resolved per-org gateway key — so a host's
378        // conversation-persisting / retrieval tools aren't degraded to no-ops.
379        let mut ctx =
380            ToolProviderContext::new(org_id, access.clone()).with_conversation_id(conversation_id);
381        if let Some(key) = gateway_key {
382            ctx = ctx.with_gateway_key(key);
383        }
384        // SEAM 3 — deliver per-tool config to host tools (registry.ts parity).
385        if let Some(configs) = tool_configs.clone() {
386            ctx = ctx.with_tool_configs(configs);
387        }
388        for tool in provider.tools_for(&ctx).await {
389            tools.register_arc(tool);
390        }
391    }
392
393    // SEAM 3 — per-agent tool allow-list. When the agent's `tool_config` restricts
394    // the tool set, drop every registered tool (built-in or host) whose snake_case
395    // name isn't enabled. `None` (empty/absent tool_config) leaves the full set.
396    if let Some(enabled) = enabled_tools {
397        tools.retain(|name| enabled.iter().any(|id| id == name));
398    }
399
400    // SEAM 3 — per-agent authLevel gate. When installed, a tool call whose
401    // configured `authLevel` isn't satisfied is blocked at execution with the
402    // reference refusal (the engine surfaces the `pre_call` error to the model).
403    // `None` ⇒ no gate.
404    if let Some(gate) = auth_gate {
405        tools.add_hook(gate);
406    }
407
408    // 3a. Write-confirmation HITL: when configured with tool patterns, install a
409    //     core `ConfirmationHook` over those tools and spawn a bridge that turns
410    //     each `HumanRequest::Confirm` into a `confirm_tool_action_required`
411    //     event + a registered resumable `HumanResponse` sender. With no
412    //     `confirmation` (the default) or empty patterns, no hook is installed —
413    //     no tool parks the turn, byte-for-byte unchanged from before HITL.
414    let confirmation_bridge = match &confirmation {
415        Some(cfg) if !cfg.tool_patterns.is_empty() => {
416            let pair = human_channel();
417            // The hook owns the request *sender* (emits Confirm) and the response
418            // *receiver* (awaits the human's verdict). The runner keeps the
419            // request *receiver* and the response *sender* for the bridge.
420            tools.add_hook(ConfirmationHook::new(
421                cfg.tool_patterns.clone(),
422                pair.request_tx,
423                pair.response_rx,
424                CONFIRMATION_TIMEOUT,
425            ));
426            Some(spawn_confirmation_bridge(
427                pair.request_rx,
428                pair.response_tx,
429                sink.clone(),
430                request_id.to_string(),
431                cfg.session_id.clone(),
432                Arc::clone(&cfg.register),
433            ))
434        }
435        _ => None,
436    };
437
438    let agent = {
439        let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
440        // Inject the mock LLM provider for offline/deterministic tests; in
441        // production a live client is built from `llm`.
442        match llm_provider {
443            Some(provider) => agent.with_llm_provider(provider),
444            None => agent,
445        }
446    };
447
448    // 4. Run with the streaming channel and translate events as they arrive.
449    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
450    let request_id_owned = request_id.to_string();
451    let sink_clone = sink.clone();
452
453    // Spawn the event translator so we forward tokens to the client in real
454    // time while the agent loop runs concurrently.
455    let translator = tokio::spawn(async move {
456        let mut invoked_knowledge_search = false;
457        // The terminal `Completed` event carries the turn's accumulated cost +
458        // token counts; capture them to surface on the `eventual_response`.
459        let mut usage: Option<crate::protocol::TurnUsage> = None;
460        while let Some(event) = rx.recv().await {
461            match event {
462                AgentEvent::TokenDelta { content } => {
463                    if !content.is_empty() {
464                        let _ = sink_clone
465                            .send(crate::protocol::stream_token(&request_id_owned, &content));
466                    }
467                }
468                AgentEvent::ReasoningDelta { content } => {
469                    // Reasoning rides its own protocol message so the client shows
470                    // it as "thinking", never as the answer (th-4d8682).
471                    if !content.is_empty() {
472                        let _ = sink_clone.send(crate::protocol::stream_reasoning(
473                            &request_id_owned,
474                            &content,
475                        ));
476                    }
477                }
478                AgentEvent::ToolCallStart {
479                    tool_name,
480                    arguments,
481                    ..
482                } => {
483                    if tool_name == "knowledge_search" {
484                        invoked_knowledge_search = true;
485                    }
486                    let _ = sink_clone.send(crate::protocol::stream_chunk(
487                        &request_id_owned,
488                        &tool_name,
489                        json!({
490                            "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
491                        }),
492                    ));
493                }
494                AgentEvent::ToolCallComplete {
495                    tool_name,
496                    result,
497                    is_error,
498                    ..
499                } => {
500                    let _ = sink_clone.send(crate::protocol::stream_chunk(
501                        &request_id_owned,
502                        &tool_name,
503                        json!({
504                            "rawResponse": json!({
505                                "toolResult": { "name": tool_name, "isError": is_error, "result": result }
506                            }),
507                        }),
508                    ));
509                }
510                AgentEvent::PhaseStart { phase, .. } => {
511                    let _ = sink_clone.send(crate::protocol::stream_chunk(
512                        &request_id_owned,
513                        &phase,
514                        json!({}),
515                    ));
516                }
517                // The terminal `Completed` event is NOT re-emitted as a stream
518                // event (the protocol carries the turn outcome on the
519                // `eventual_response`), but we capture its accumulated cost +
520                // token counts to attach to that terminal event's `usage`.
521                AgentEvent::Completed {
522                    cost_usd,
523                    prompt_tokens,
524                    completion_tokens,
525                    ..
526                } => {
527                    usage = Some(crate::protocol::TurnUsage {
528                        cost_usd,
529                        prompt_tokens,
530                        completion_tokens,
531                    });
532                }
533                // Other Started / token-accounting events are terminal or
534                // structural; the protocol carries those via immediate/eventual
535                // responses, so they're intentionally not re-emitted here.
536                _ => {}
537            }
538        }
539        (invoked_knowledge_search, usage)
540    });
541
542    // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
543    // the channel closes and the translator task drains and finishes.
544    let conversation = agent.run_with_channel(user_message, tx).await?;
545
546    // The turn is over: tear down the confirmation bridge. `run_with_channel`
547    // borrows `&self`, so the agent (and the `ConfirmationHook` it owns via the
548    // tool registry) is STILL alive here — and the hook holds the bridge's
549    // request *sender*. Dropping the agent closes that sender, which is what
550    // lets the bridge's `request_rx.recv()` return `None` and the task finish.
551    // Without this explicit drop, awaiting the bridge below would hang forever.
552    drop(agent);
553    if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
554        let _ = handle.await;
555        (cfg.clear)(&cfg.session_id);
556    }
557
558    let (invoked_knowledge_search, usage) = translator.await.unwrap_or((false, None));
559
560    let reply = conversation
561        .last_assistant_content()
562        .unwrap_or_default()
563        .to_string();
564
565    // 5. Persist the outbound reply and capture its id for eventual_response.
566    let message_id = if reply.is_empty() {
567        uuid::Uuid::new_v4().to_string()
568    } else {
569        persist_message(
570            storage.as_ref(),
571            conversation_id,
572            Direction::Outbound,
573            &reply,
574        )
575        .await?
576        .id
577    };
578
579    // Build citations from the sources that grounded this turn: auto-injected
580    // context first (it grounded the first LLM call), then the agent's
581    // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
582    let tool_sources = match Arc::try_unwrap(tool_sources) {
583        Ok(mutex) => mutex
584            .into_inner()
585            .unwrap_or_else(std::sync::PoisonError::into_inner),
586        Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
587    };
588    let citations = collect_citations(&auto_sources, &tool_sources);
589
590    // 6. Conversation-workflow advancement (SMOODEV-590 parity). When the agent
591    //    has a workflow, ask the cheap judge whether THIS turn satisfied the
592    //    current step's criteria and compute the next step id. Failure-tolerant:
593    //    no judge, an empty reply, or a judge error all keep the current step —
594    //    the conversation never freezes and the turn never fails on the judge.
595    let next_step_id = match workflow.as_ref() {
596        Some(wt) => Some(
597            judge_next_step(
598                judge.as_deref(),
599                &wt.workflow,
600                wt.current_step_id.as_deref(),
601                user_message,
602                &reply,
603            )
604            .await,
605        ),
606        None => None,
607    };
608
609    Ok(TurnResult {
610        reply,
611        message_id,
612        invoked_knowledge_search,
613        citations,
614        usage,
615        next_step_id,
616    })
617}
618
619/// Run the workflow judge for one turn and return the step id to resume on.
620///
621/// Mirrors `nodes/workflow-judge.ts`: a cheap yes/no/maybe verdict on whether the
622/// current step's criteria were met, advancing only on `yes`. Every failure mode
623/// keeps the current step (never freezes, never advances on ambiguity):
624///   - no judge provider, an empty agent reply, or a judge LLM error → stay put,
625///   - an unrecognized verdict → `Maybe` → stay put.
626async fn judge_next_step(
627    judge: Option<&dyn LlmProvider>,
628    workflow: &ConversationWorkflow,
629    current_step_id: Option<&str>,
630    user_message: &str,
631    reply: &str,
632) -> String {
633    let current = match resolve_current_step(workflow, current_step_id) {
634        Some(step) => step.clone(),
635        // Empty workflow (shouldn't happen — the provider drops empty-steps
636        // workflows) → echo the incoming pointer or empty.
637        None => return current_step_id.unwrap_or_default().to_string(),
638    };
639
640    // Nothing to judge without a reply or a judge surface → stay on the step.
641    let stay = || current.id.clone();
642    if reply.trim().is_empty() {
643        return stay();
644    }
645    let Some(judge) = judge else {
646        return stay();
647    };
648
649    let system = EngineMessage::system(JUDGE_SYSTEM_PROMPT);
650    let user = EngineMessage::user(judge_user_prompt(workflow, &current, user_message, reply));
651    let verdict = match judge.chat(&[&system, &user], &[]).await {
652        Ok(resp) => WorkflowJudgeVerdict::parse(&resp.content),
653        Err(e) => {
654            tracing::warn!(error = %e, step = %current.id, "workflow judge failed; staying on current step");
655            WorkflowJudgeVerdict::Maybe
656        }
657    };
658
659    advance_after_verdict(workflow, Some(&current.id), verdict).unwrap_or_else(stay)
660}
661
662/// Spawn the **confirmation bridge** for a turn that has a `ConfirmationHook`
663/// installed. The bridge owns the request *receiver* (each item is a
664/// [`HumanRequest::Confirm`] the hook emitted when a write tool is about to run)
665/// and the response *sender* (the hook awaits the verdict on its paired
666/// receiver). For every confirm request it:
667///   1. registers `response_tx` under `session_id` via `register`, so an inbound
668///      `confirm_tool_action` can take it and feed the verdict back, and
669///   2. emits a `write_confirmation_required` event through the turn `sink`,
670///      parking the turn until the client confirms.
671///
672/// The `tool_name` is used as the event's opaque `toolId`: core's
673/// `HumanRequest::Confirm` doesn't carry the LLM's tool-call id, but a turn only
674/// parks one write tool at a time (the loop blocks inside `pre_call`), so the
675/// tool name is a stable, sufficient correlation key for the resume. The bridge
676/// loops until the request channel closes (the hook/agent dropped at turn end),
677/// then returns — letting the caller clear the registration.
678fn spawn_confirmation_bridge(
679    mut request_rx: UnboundedReceiver<HumanRequest>,
680    response_tx: UnboundedSender<HumanResponse>,
681    sink: UnboundedSender<serde_json::Value>,
682    request_id: String,
683    session_id: String,
684    register: RegisterConfirmation,
685) -> tokio::task::JoinHandle<()> {
686    tokio::spawn(async move {
687        while let Some(req) = request_rx.recv().await {
688            match req {
689                HumanRequest::Confirm {
690                    tool_name, prompt, ..
691                } => {
692                    // Register THIS turn's response sender so the next
693                    // `confirm_tool_action` for this session resumes it. Re-clone
694                    // per request: the hook takes one verdict per parked tool.
695                    register(&session_id, response_tx.clone());
696                    // Per spec the event carries a `requestId` (correlation), an
697                    // opaque `toolId` (the tool name — one tool parks at a time),
698                    // and the human-readable `actionDescription` (the hook prompt).
699                    let _ = sink.send(crate::protocol::write_confirmation_required(
700                        &request_id,
701                        &tool_name,
702                        &prompt,
703                    ));
704                }
705                // The chat HITL path only emits `Confirm`; a free-form `Input`
706                // request has no chat affordance, so auto-decline it rather than
707                // hang the turn (keeps the loop live for the next confirm).
708                HumanRequest::Input { .. } => {
709                    let _ = response_tx.send(HumanResponse::Denied {
710                        reason: "free-form human input is not supported on this channel".into(),
711                    });
712                }
713            }
714        }
715    })
716}
717
718/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
719/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
720/// by the runner) followed by everything the agent's `knowledge_search` calls
721/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
722/// (first occurrence wins), mapped to [`Citation`], and capped at
723/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
724fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
725    let mut seen = std::collections::HashSet::new();
726    auto.iter()
727        .chain(tool.iter())
728        .filter(|r| seen.insert(r.document_id.clone()))
729        .take(MAX_CITATIONS)
730        .map(Citation::from_knowledge_result)
731        .collect()
732}
733
734/// Load the conversation's persisted messages (oldest-first, capped) and convert
735/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
736async fn load_prior_messages(
737    storage: &dyn StorageAdapter,
738    conversation_id: &str,
739) -> Result<Vec<EngineMessage>> {
740    let page = storage
741        .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
742        .await?;
743
744    let mut out = Vec::with_capacity(page.messages.len());
745    for m in page.messages {
746        let text = m
747            .content
748            .text
749            .clone()
750            .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
751            .unwrap_or_default();
752        if text.is_empty() {
753            continue;
754        }
755        let role = match m.direction {
756            Direction::Inbound => Role::User,
757            Direction::Outbound => Role::Assistant,
758        };
759        out.push(EngineMessage {
760            id: m.id,
761            role,
762            content: text,
763            tool_call_id: None,
764            tool_name: None,
765            tool_calls: vec![],
766            reasoning_content: None,
767            timestamp: m.created_at,
768        });
769    }
770    Ok(out)
771}
772
773/// Append a single message to the conversation's log via the adapter.
774async fn persist_message(
775    storage: &dyn StorageAdapter,
776    conversation_id: &str,
777    direction: Direction,
778    text: &str,
779) -> Result<DomainMessage> {
780    let now = chrono::Utc::now();
781    let message = DomainMessage {
782        id: uuid::Uuid::new_v4().to_string(),
783        external_id: None,
784        organization_id: None,
785        conversation_id: Some(conversation_id.to_string()),
786        direction,
787        content: MessageContent::from_text(text),
788        from: None,
789        to: None,
790        metadata_json: None,
791        analytics_json: None,
792        created_at: now,
793        updated_at: None,
794    };
795    storage.append_message(message).await
796}
797
798/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
799/// `eventual_response` carries. The reference runtime doesn't produce the full
800/// structured analytics, so we surface the reply text in `responseParts` and
801/// supply neutral defaults for the analytic fields (clients render
802/// `responseParts`).
803#[must_use]
804pub fn general_agent_response(reply: &str) -> serde_json::Value {
805    json!({
806        "responseParts": [reply],
807        "customerHappinessScore": 0.5,
808        "needsSatisfactionScore": 0.5,
809        "requestSummary": "",
810        "resolutionStatus": "in_progress",
811        "suggestedNextActions": [],
812    })
813}