Skip to main content

smooth_operator_server/
runner.rs

1//! The streaming, memory-carrying agent runner used by the WS service.
2//!
3//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
4//! path but (a) is non-streaming (`run_turn` returns only after the turn
5//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
6//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
7//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
8//!
9//! The service needs both, so this module builds the agent itself, wiring the
10//! same knowledge-grounding as core PLUS:
11//!
12//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
13//!    [`AgentEvent`] stream into protocol events (`stream_token`,
14//!    `stream_chunk`, `eventual_response`).
15//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
16//!    each turn the session's persisted message log is loaded from the storage
17//!    adapter and replayed into the conversation, so the model sees turn 1 when
18//!    answering turn 2. (`Agent::new` randomizes the agent id every time, so the
19//!    checkpoint-resume path can't be keyed stably — replaying the persisted log
20//!    is the robust, backend-agnostic way to carry memory. The log is the source
21//!    of truth the adapter already persists.)
22
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use anyhow::Result;
27use serde_json::json;
28use smooth_operator_core::llm_provider::LlmProvider;
29use smooth_operator_core::{
30    human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
31    KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
32};
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34
35use smooth_operator::access_control::AccessContext;
36use smooth_operator::adapter::{MessageQuery, StorageAdapter};
37use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
38use smooth_operator::rerank::Reranker;
39use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
40use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
41use smooth_operator::MAX_CITATIONS;
42
43/// How many auto-injected knowledge results the engine prepends as
44/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
45/// auto-injection (a top-3 query) so the citations we collect match the sources
46/// that grounded the first LLM call.
47const AUTO_CONTEXT_LIMIT: usize = 3;
48
49/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
50/// answers in the knowledge base and search it before answering anything
51/// organization-specific.
52const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
53    "You are a helpful customer-support agent for the organization. \
54    Answer the user's question accurately and concisely. When a question depends on \
55    organization-specific facts (policies, products, documentation), call the \
56    `knowledge_search` tool to retrieve them before answering, and ground your answer \
57    in what you retrieve. If the knowledge base has no relevant information, say so. \
58    Remember facts the user tells you within the conversation and use them when asked.";
59
60/// Max prior turns to replay into the conversation for memory. Bounds context
61/// growth on long sessions; the in-memory log is small but a real backend could
62/// be large.
63const MAX_PRIOR_MESSAGES: usize = 50;
64
65/// How long a parked write-tool confirmation waits for a `confirm_tool_action`
66/// before the core `ConfirmationHook` gives up and treats the tool as denied
67/// (a timeout). Bounds a stuck turn so a client that never confirms can't pin a
68/// task forever. Generous (5 min) because a human is in the loop.
69const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);
70
71/// Registers a parked turn's [`HumanResponse`] sender under a session id (so a
72/// later `confirm_tool_action` can take it). Typically `AppState::register_confirmation`.
73pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;
74
75/// Clears any registered confirmation sender for a session id when its turn ends.
76/// Typically `AppState::clear_confirmation`.
77pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;
78
79/// Hooks the runner needs to wire **write-confirmation HITL** into a turn
80/// without depending on `AppState` directly (keeps the runner unit-testable).
81///
82/// When `Some`, the runner installs a core [`ConfirmationHook`] over every tool
83/// whose name matches one of [`tool_patterns`](Self::tool_patterns). When such a
84/// tool is about to run, the agent loop **parks** inside the hook's `pre_call`
85/// and emits a [`HumanRequest::Confirm`]; the runner's bridge:
86///   1. calls [`register`](Self::register) with the session's
87///      [`HumanResponse`] sender, so a later `confirm_tool_action` can resume,
88///   2. emits a `confirm_tool_action_required` event through the turn sink.
89///
90/// On `confirm_tool_action`, the handler feeds the sender [`HumanResponse`] and
91/// the parked tool either executes (approved) or is skipped with a rejection
92/// result (denied). `None` (the default) installs no hook → no tool ever parks →
93/// behavior is byte-for-byte identical to before HITL.
94pub struct ConfirmationConfig {
95    /// Tool-name substrings that require human approval (matched by core's
96    /// `ConfirmationHook`, which uses `contains` matching). Empty disables HITL.
97    pub tool_patterns: Vec<String>,
98    /// The session this turn belongs to — carried on the
99    /// `confirm_tool_action_required` event and the registration key so the
100    /// inbound `confirm_tool_action` (keyed by `sessionId`) routes back here.
101    pub session_id: String,
102    /// Registers the parked turn's [`HumanResponse`] sender under
103    /// [`session_id`](Self::session_id) (typically `AppState::register_confirmation`).
104    pub register: RegisterConfirmation,
105    /// Clears any registered sender for [`session_id`](Self::session_id) when the
106    /// turn ends (typically `AppState::clear_confirmation`), so a stale sender
107    /// can't mis-route a later confirmation.
108    pub clear: ClearConfirmation,
109}
110
111/// The terminal outcome of a streamed turn.
112pub struct TurnResult {
113    /// The agent's final natural-language reply.
114    pub reply: String,
115    /// The id of the persisted outbound (agent) message, for `eventual_response`.
116    pub message_id: String,
117    /// True if any `knowledge_search` tool call ran this turn (diagnostics).
118    pub invoked_knowledge_search: bool,
119    /// The sources that grounded this turn (the auto-injected context + every
120    /// `knowledge_search` result), deduped by id and capped. Carried onto the
121    /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
122    pub citations: Vec<Citation>,
123}
124
125/// Everything one streaming turn needs. Bundled into a struct so the call sites
126/// (the reference server's `handle_send_message` and the lambda's
127/// `send_message`) stay readable and the security-critical [`access`](Self::access)
128/// field can never be silently dropped from a positional argument list.
129pub struct TurnRequest<'a> {
130    /// The storage seam (conversations / messages / sessions / knowledge).
131    pub storage: Arc<dyn StorageAdapter>,
132    /// The resolved LLM config for this turn.
133    pub llm: LlmConfig,
134    /// Agent-loop iteration cap.
135    pub max_iterations: u32,
136    /// The conversation this turn belongs to.
137    pub conversation_id: &'a str,
138    /// The protocol request id (streaming correlation).
139    pub request_id: &'a str,
140    /// The inbound user message.
141    pub user_message: &'a str,
142    /// **The requester's document-level entitlements.** Retrieval (the
143    /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
144    /// tool) reads through `storage.knowledge_for_access(&access)`, so a
145    /// restricted document is never surfaced to a requester who lacks the
146    /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
147    /// (fail closed for ACL'd content).
148    pub access: AccessContext,
149    /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
150    /// deterministically offline. `None` in production (a live client is built
151    /// from `llm`).
152    pub llm_provider: Option<Arc<dyn LlmProvider>>,
153    /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
154    /// `knowledge_search` tool overfetches candidates and reorders the top-K with
155    /// this reranker before they reach the model. `None` (the default) keeps the
156    /// retrieval order unchanged, so default behavior is byte-for-byte the same.
157    /// Selected by [`build_reranker`](crate::reranker::build_reranker).
158    pub reranker: Option<Arc<dyn Reranker>>,
159    /// Optional **write-confirmation HITL** wiring. `None` (the default) installs
160    /// no confirmation hook, so no tool ever parks the turn and behavior is
161    /// identical to before HITL. `Some` installs a core [`ConfirmationHook`] over
162    /// the configured tool patterns and bridges its [`HumanRequest`]s to a
163    /// `confirm_tool_action_required` event + a registered resumable sender. See
164    /// [`ConfirmationConfig`].
165    pub confirmation: Option<ConfirmationConfig>,
166    /// **SEAM 1 — host tool injection.** When `Some`, the runner asks this
167    /// provider for EXTRA tools and merges them into the turn's
168    /// [`ToolRegistry`] alongside the built-ins. `None` (the default) leaves the
169    /// registry as exactly the built-ins, so default behavior is byte-for-byte
170    /// unchanged. A host installs one via [`AppState::with_tools`](crate::state::AppState::with_tools).
171    pub tool_provider: Option<Arc<dyn ToolProvider>>,
172    /// **SEAM 2 — per-org agent persona.** The resolved system prompt for this
173    /// turn. When `Some`, it REPLACES the built-in [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`]
174    /// as the agent's system prompt (the host resolves it from per-org settings,
175    /// e.g. [`AgentSettings::persona`](smooth_operator::settings::AgentSettings::persona)).
176    /// `None` (the default) keeps the const prompt, so default behavior is
177    /// byte-for-byte unchanged.
178    pub system_prompt: Option<String>,
179    /// The owning org for this turn, threaded into the
180    /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
181    /// so a [`ToolProvider`] can return per-org tools. `None` when no org is
182    /// resolved (e.g. an anonymous reference-server connection).
183    pub org_id: Option<String>,
184}
185
186/// Runs one knowledge-grounded, streaming turn for a session's conversation and
187/// emits protocol-shaped events through `sink` as they happen.
188///
189/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
190/// [`crate::protocol`]). The caller forwards them over the WebSocket.
191///
192/// ## Access control (security-critical)
193///
194/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
195/// context and the agent's `knowledge_search` tool — read through
196/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
197/// so a document the requester is not entitled to (e.g. a private-repo doc
198/// scoped to a group the requester is not in) is dropped before it can reach the
199/// model or a citation. See `docs/ACCESS-CONTROL.md`.
200///
201/// # Errors
202/// Returns an error if message persistence or the agent loop fails fatally. The
203/// caller converts this into a protocol `error` event.
204pub async fn run_streaming_turn(
205    req: TurnRequest<'_>,
206    sink: &UnboundedSender<serde_json::Value>,
207) -> Result<TurnResult> {
208    let TurnRequest {
209        storage,
210        llm,
211        max_iterations,
212        conversation_id,
213        request_id,
214        user_message,
215        access,
216        llm_provider,
217        reranker,
218        confirmation,
219        tool_provider,
220        system_prompt,
221        org_id,
222    } = req;
223
224    // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
225    // Built once from the requester's `AccessContext` so the auto-injected
226    // context query, the agent's `knowledge_search` tool, and the citation
227    // mirror all hit the SAME filtered view — a restricted doc can't leak in
228    // through one path while being dropped on another.
229    let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
230
231    // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
232    //    citations include the sources the FIRST LLM call was grounded with.
233    //    Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
234    //    against the same ACL-filtered handle. Best-effort: a KB error yields no
235    //    auto-context citations.
236    let auto_sources: Vec<KnowledgeResult> = knowledge
237        .query(user_message, AUTO_CONTEXT_LIMIT)
238        .unwrap_or_default();
239    // Sink the knowledge_search tool records its structured results into, for
240    // citations built from the sources the agent's searches surfaced.
241    let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
242
243    // 1. Load prior turns for memory BEFORE persisting the new inbound message,
244    //    so prior_messages is exactly the history-up-to-now.
245    let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
246
247    // 2. Persist the inbound user message.
248    persist_message(
249        storage.as_ref(),
250        conversation_id,
251        Direction::Inbound,
252        user_message,
253    )
254    .await?;
255
256    // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
257    //    SAME ACL-filtered handle) + replayed prior messages for memory.
258    //
259    //    SEAM 2 — resolve the system prompt: a host-supplied per-org persona
260    //    (`system_prompt`) overrides the built-in const; absent ⇒ the const, so
261    //    default behavior is byte-for-byte unchanged.
262    let resolved_prompt = system_prompt
263        .as_deref()
264        .unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
265    let config = AgentConfig::new("smooth-agent-chat", resolved_prompt, llm)
266        .with_max_iterations(max_iterations)
267        .with_knowledge(Arc::clone(&knowledge))
268        .with_prior_messages(prior);
269
270    let mut tools = ToolRegistry::new();
271    // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
272    // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
273    // stage. With `None` (the default) the tool fetches exactly `limit` and
274    // returns the retrieval order unchanged.
275    let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
276        .with_result_sink(Arc::clone(&tool_sources));
277    if let Some(reranker) = reranker {
278        knowledge_search = knowledge_search.with_reranker(reranker);
279    }
280    tools.register(knowledge_search);
281
282    // SEAM 1 — merge host-contributed tools. When a provider is installed, ask
283    // it (with the turn's org + access context) for extra tools and register
284    // each alongside the built-ins. Built-ins are registered FIRST, so a host
285    // tool that intentionally reuses a built-in name replaces it; a distinct
286    // name simply adds. With no provider this block is a no-op, leaving the
287    // registry as exactly today's built-ins.
288    if let Some(provider) = tool_provider {
289        let ctx = ToolProviderContext::new(org_id, access.clone());
290        for tool in provider.tools_for(&ctx).await {
291            tools.register_arc(tool);
292        }
293    }
294
295    // 3a. Write-confirmation HITL: when configured with tool patterns, install a
296    //     core `ConfirmationHook` over those tools and spawn a bridge that turns
297    //     each `HumanRequest::Confirm` into a `confirm_tool_action_required`
298    //     event + a registered resumable `HumanResponse` sender. With no
299    //     `confirmation` (the default) or empty patterns, no hook is installed —
300    //     no tool parks the turn, byte-for-byte unchanged from before HITL.
301    let confirmation_bridge = match &confirmation {
302        Some(cfg) if !cfg.tool_patterns.is_empty() => {
303            let pair = human_channel();
304            // The hook owns the request *sender* (emits Confirm) and the response
305            // *receiver* (awaits the human's verdict). The runner keeps the
306            // request *receiver* and the response *sender* for the bridge.
307            tools.add_hook(ConfirmationHook::new(
308                cfg.tool_patterns.clone(),
309                pair.request_tx,
310                pair.response_rx,
311                CONFIRMATION_TIMEOUT,
312            ));
313            Some(spawn_confirmation_bridge(
314                pair.request_rx,
315                pair.response_tx,
316                sink.clone(),
317                request_id.to_string(),
318                cfg.session_id.clone(),
319                Arc::clone(&cfg.register),
320            ))
321        }
322        _ => None,
323    };
324
325    let agent = {
326        let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
327        // Inject the mock LLM provider for offline/deterministic tests; in
328        // production a live client is built from `llm`.
329        match llm_provider {
330            Some(provider) => agent.with_llm_provider(provider),
331            None => agent,
332        }
333    };
334
335    // 4. Run with the streaming channel and translate events as they arrive.
336    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
337    let request_id_owned = request_id.to_string();
338    let sink_clone = sink.clone();
339
340    // Spawn the event translator so we forward tokens to the client in real
341    // time while the agent loop runs concurrently.
342    let translator = tokio::spawn(async move {
343        let mut invoked_knowledge_search = false;
344        while let Some(event) = rx.recv().await {
345            match event {
346                AgentEvent::TokenDelta { content } => {
347                    if !content.is_empty() {
348                        let _ = sink_clone
349                            .send(crate::protocol::stream_token(&request_id_owned, &content));
350                    }
351                }
352                AgentEvent::ToolCallStart {
353                    tool_name,
354                    arguments,
355                    ..
356                } => {
357                    if tool_name == "knowledge_search" {
358                        invoked_knowledge_search = true;
359                    }
360                    let _ = sink_clone.send(crate::protocol::stream_chunk(
361                        &request_id_owned,
362                        &tool_name,
363                        json!({
364                            "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
365                        }),
366                    ));
367                }
368                AgentEvent::ToolCallComplete {
369                    tool_name,
370                    result,
371                    is_error,
372                    ..
373                } => {
374                    let _ = sink_clone.send(crate::protocol::stream_chunk(
375                        &request_id_owned,
376                        &tool_name,
377                        json!({
378                            "rawResponse": json!({
379                                "toolResult": { "name": tool_name, "isError": is_error, "result": result }
380                            }),
381                        }),
382                    ));
383                }
384                AgentEvent::PhaseStart { phase, .. } => {
385                    let _ = sink_clone.send(crate::protocol::stream_chunk(
386                        &request_id_owned,
387                        &phase,
388                        json!({}),
389                    ));
390                }
391                // Started / Completed / token-accounting events are terminal or
392                // structural; the protocol carries those via immediate/eventual
393                // responses, so they're intentionally not re-emitted here.
394                _ => {}
395            }
396        }
397        invoked_knowledge_search
398    });
399
400    // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
401    // the channel closes and the translator task drains and finishes.
402    let conversation = agent.run_with_channel(user_message, tx).await?;
403
404    // The turn is over: tear down the confirmation bridge. `run_with_channel`
405    // borrows `&self`, so the agent (and the `ConfirmationHook` it owns via the
406    // tool registry) is STILL alive here — and the hook holds the bridge's
407    // request *sender*. Dropping the agent closes that sender, which is what
408    // lets the bridge's `request_rx.recv()` return `None` and the task finish.
409    // Without this explicit drop, awaiting the bridge below would hang forever.
410    drop(agent);
411    if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
412        let _ = handle.await;
413        (cfg.clear)(&cfg.session_id);
414    }
415
416    let invoked_knowledge_search = translator.await.unwrap_or(false);
417
418    let reply = conversation
419        .last_assistant_content()
420        .unwrap_or_default()
421        .to_string();
422
423    // 5. Persist the outbound reply and capture its id for eventual_response.
424    let message_id = if reply.is_empty() {
425        uuid::Uuid::new_v4().to_string()
426    } else {
427        persist_message(
428            storage.as_ref(),
429            conversation_id,
430            Direction::Outbound,
431            &reply,
432        )
433        .await?
434        .id
435    };
436
437    // Build citations from the sources that grounded this turn: auto-injected
438    // context first (it grounded the first LLM call), then the agent's
439    // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
440    let tool_sources = match Arc::try_unwrap(tool_sources) {
441        Ok(mutex) => mutex
442            .into_inner()
443            .unwrap_or_else(std::sync::PoisonError::into_inner),
444        Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
445    };
446    let citations = collect_citations(&auto_sources, &tool_sources);
447
448    Ok(TurnResult {
449        reply,
450        message_id,
451        invoked_knowledge_search,
452        citations,
453    })
454}
455
456/// Spawn the **confirmation bridge** for a turn that has a `ConfirmationHook`
457/// installed. The bridge owns the request *receiver* (each item is a
458/// [`HumanRequest::Confirm`] the hook emitted when a write tool is about to run)
459/// and the response *sender* (the hook awaits the verdict on its paired
460/// receiver). For every confirm request it:
461///   1. registers `response_tx` under `session_id` via `register`, so an inbound
462///      `confirm_tool_action` can take it and feed the verdict back, and
463///   2. emits a `write_confirmation_required` event through the turn `sink`,
464///      parking the turn until the client confirms.
465///
466/// The `tool_name` is used as the event's opaque `toolId`: core's
467/// `HumanRequest::Confirm` doesn't carry the LLM's tool-call id, but a turn only
468/// parks one write tool at a time (the loop blocks inside `pre_call`), so the
469/// tool name is a stable, sufficient correlation key for the resume. The bridge
470/// loops until the request channel closes (the hook/agent dropped at turn end),
471/// then returns — letting the caller clear the registration.
472fn spawn_confirmation_bridge(
473    mut request_rx: UnboundedReceiver<HumanRequest>,
474    response_tx: UnboundedSender<HumanResponse>,
475    sink: UnboundedSender<serde_json::Value>,
476    request_id: String,
477    session_id: String,
478    register: RegisterConfirmation,
479) -> tokio::task::JoinHandle<()> {
480    tokio::spawn(async move {
481        while let Some(req) = request_rx.recv().await {
482            match req {
483                HumanRequest::Confirm {
484                    tool_name, prompt, ..
485                } => {
486                    // Register THIS turn's response sender so the next
487                    // `confirm_tool_action` for this session resumes it. Re-clone
488                    // per request: the hook takes one verdict per parked tool.
489                    register(&session_id, response_tx.clone());
490                    // Per spec the event carries a `requestId` (correlation), an
491                    // opaque `toolId` (the tool name — one tool parks at a time),
492                    // and the human-readable `actionDescription` (the hook prompt).
493                    let _ = sink.send(crate::protocol::write_confirmation_required(
494                        &request_id,
495                        &tool_name,
496                        &prompt,
497                    ));
498                }
499                // The chat HITL path only emits `Confirm`; a free-form `Input`
500                // request has no chat affordance, so auto-decline it rather than
501                // hang the turn (keeps the loop live for the next confirm).
502                HumanRequest::Input { .. } => {
503                    let _ = response_tx.send(HumanResponse::Denied {
504                        reason: "free-form human input is not supported on this channel".into(),
505                    });
506                }
507            }
508        }
509    })
510}
511
512/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
513/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
514/// by the runner) followed by everything the agent's `knowledge_search` calls
515/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
516/// (first occurrence wins), mapped to [`Citation`], and capped at
517/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
518fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
519    let mut seen = std::collections::HashSet::new();
520    auto.iter()
521        .chain(tool.iter())
522        .filter(|r| seen.insert(r.document_id.clone()))
523        .take(MAX_CITATIONS)
524        .map(Citation::from_knowledge_result)
525        .collect()
526}
527
528/// Load the conversation's persisted messages (oldest-first, capped) and convert
529/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
530async fn load_prior_messages(
531    storage: &dyn StorageAdapter,
532    conversation_id: &str,
533) -> Result<Vec<EngineMessage>> {
534    let page = storage
535        .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
536        .await?;
537
538    let mut out = Vec::with_capacity(page.messages.len());
539    for m in page.messages {
540        let text = m
541            .content
542            .text
543            .clone()
544            .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
545            .unwrap_or_default();
546        if text.is_empty() {
547            continue;
548        }
549        let role = match m.direction {
550            Direction::Inbound => Role::User,
551            Direction::Outbound => Role::Assistant,
552        };
553        out.push(EngineMessage {
554            id: m.id,
555            role,
556            content: text,
557            tool_call_id: None,
558            tool_name: None,
559            tool_calls: vec![],
560            reasoning_content: None,
561            timestamp: m.created_at,
562        });
563    }
564    Ok(out)
565}
566
567/// Append a single message to the conversation's log via the adapter.
568async fn persist_message(
569    storage: &dyn StorageAdapter,
570    conversation_id: &str,
571    direction: Direction,
572    text: &str,
573) -> Result<DomainMessage> {
574    let now = chrono::Utc::now();
575    let message = DomainMessage {
576        id: uuid::Uuid::new_v4().to_string(),
577        external_id: None,
578        organization_id: None,
579        conversation_id: Some(conversation_id.to_string()),
580        direction,
581        content: MessageContent::from_text(text),
582        from: None,
583        to: None,
584        metadata_json: None,
585        analytics_json: None,
586        created_at: now,
587        updated_at: None,
588    };
589    storage.append_message(message).await
590}
591
592/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
593/// `eventual_response` carries. The reference runtime doesn't produce the full
594/// structured analytics, so we surface the reply text in `responseParts` and
595/// supply neutral defaults for the analytic fields (clients render
596/// `responseParts`).
597#[must_use]
598pub fn general_agent_response(reply: &str) -> serde_json::Value {
599    json!({
600        "responseParts": [reply],
601        "customerHappinessScore": 0.5,
602        "needsSatisfactionScore": 0.5,
603        "requestSummary": "",
604        "resolutionStatus": "in_progress",
605        "suggestedNextActions": [],
606    })
607}