smooth_operator_server/runner.rs
1//! The streaming, memory-carrying agent runner used by the WS service.
2//!
3//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
4//! path but (a) is non-streaming (`run_turn` returns only after the turn
5//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
6//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
7//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
8//!
9//! The service needs both, so this module builds the agent itself, wiring the
10//! same knowledge-grounding as core PLUS:
11//!
12//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
13//! [`AgentEvent`] stream into protocol events (`stream_token`,
14//! `stream_chunk`, `eventual_response`).
15//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
16//! each turn the session's persisted message log is loaded from the storage
17//! adapter and replayed into the conversation, so the model sees turn 1 when
18//! answering turn 2. (`Agent::new` randomizes the agent id every time, so the
19//! checkpoint-resume path can't be keyed stably — replaying the persisted log
20//! is the robust, backend-agnostic way to carry memory. The log is the source
21//! of truth the adapter already persists.)
22
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use anyhow::Result;
27use serde_json::json;
28use smooth_operator_core::llm_provider::LlmProvider;
29use smooth_operator_core::{
30 human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
31 KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
32};
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34
35use smooth_operator::access_control::AccessContext;
36use smooth_operator::adapter::{MessageQuery, StorageAdapter};
37use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
38use smooth_operator::rerank::Reranker;
39use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
40use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
41use smooth_operator::MAX_CITATIONS;
42
43/// How many auto-injected knowledge results the engine prepends as
44/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
45/// auto-injection (a top-3 query) so the citations we collect match the sources
46/// that grounded the first LLM call.
47const AUTO_CONTEXT_LIMIT: usize = 3;
48
49/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
50/// answers in the knowledge base and search it before answering anything
51/// organization-specific.
52const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
53 "You are a helpful customer-support agent for the organization. \
54 Answer the user's question accurately and concisely. When a question depends on \
55 organization-specific facts (policies, products, documentation), call the \
56 `knowledge_search` tool to retrieve them before answering, and ground your answer \
57 in what you retrieve. If the knowledge base has no relevant information, say so. \
58 Remember facts the user tells you within the conversation and use them when asked.";
59
60/// Max prior turns to replay into the conversation for memory. Bounds context
61/// growth on long sessions; the in-memory log is small but a real backend could
62/// be large.
63const MAX_PRIOR_MESSAGES: usize = 50;
64
65/// How long a parked write-tool confirmation waits for a `confirm_tool_action`
66/// before the core `ConfirmationHook` gives up and treats the tool as denied
67/// (a timeout). Bounds a stuck turn so a client that never confirms can't pin a
68/// task forever. Generous (5 min) because a human is in the loop.
69const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);
70
71/// Registers a parked turn's [`HumanResponse`] sender under a session id (so a
72/// later `confirm_tool_action` can take it). Typically `AppState::register_confirmation`.
73pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;
74
75/// Clears any registered confirmation sender for a session id when its turn ends.
76/// Typically `AppState::clear_confirmation`.
77pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;
78
79/// Hooks the runner needs to wire **write-confirmation HITL** into a turn
80/// without depending on `AppState` directly (keeps the runner unit-testable).
81///
82/// When `Some`, the runner installs a core [`ConfirmationHook`] over every tool
83/// whose name matches one of [`tool_patterns`](Self::tool_patterns). When such a
84/// tool is about to run, the agent loop **parks** inside the hook's `pre_call`
85/// and emits a [`HumanRequest::Confirm`]; the runner's bridge:
86/// 1. calls [`register`](Self::register) with the session's
87/// [`HumanResponse`] sender, so a later `confirm_tool_action` can resume,
88/// 2. emits a `confirm_tool_action_required` event through the turn sink.
89///
90/// On `confirm_tool_action`, the handler feeds the sender [`HumanResponse`] and
91/// the parked tool either executes (approved) or is skipped with a rejection
92/// result (denied). `None` (the default) installs no hook → no tool ever parks →
93/// behavior is byte-for-byte identical to before HITL.
94pub struct ConfirmationConfig {
95 /// Tool-name substrings that require human approval (matched by core's
96 /// `ConfirmationHook`, which uses `contains` matching). Empty disables HITL.
97 pub tool_patterns: Vec<String>,
98 /// The session this turn belongs to — carried on the
99 /// `confirm_tool_action_required` event and the registration key so the
100 /// inbound `confirm_tool_action` (keyed by `sessionId`) routes back here.
101 pub session_id: String,
102 /// Registers the parked turn's [`HumanResponse`] sender under
103 /// [`session_id`](Self::session_id) (typically `AppState::register_confirmation`).
104 pub register: RegisterConfirmation,
105 /// Clears any registered sender for [`session_id`](Self::session_id) when the
106 /// turn ends (typically `AppState::clear_confirmation`), so a stale sender
107 /// can't mis-route a later confirmation.
108 pub clear: ClearConfirmation,
109}
110
111/// The terminal outcome of a streamed turn.
112pub struct TurnResult {
113 /// The agent's final natural-language reply.
114 pub reply: String,
115 /// The id of the persisted outbound (agent) message, for `eventual_response`.
116 pub message_id: String,
117 /// True if any `knowledge_search` tool call ran this turn (diagnostics).
118 pub invoked_knowledge_search: bool,
119 /// The sources that grounded this turn (the auto-injected context + every
120 /// `knowledge_search` result), deduped by id and capped. Carried onto the
121 /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
122 pub citations: Vec<Citation>,
123}
124
125/// Everything one streaming turn needs. Bundled into a struct so the call sites
126/// (the reference server's `handle_send_message` and the lambda's
127/// `send_message`) stay readable and the security-critical [`access`](Self::access)
128/// field can never be silently dropped from a positional argument list.
129pub struct TurnRequest<'a> {
130 /// The storage seam (conversations / messages / sessions / knowledge).
131 pub storage: Arc<dyn StorageAdapter>,
132 /// The resolved LLM config for this turn.
133 pub llm: LlmConfig,
134 /// Agent-loop iteration cap.
135 pub max_iterations: u32,
136 /// The conversation this turn belongs to.
137 pub conversation_id: &'a str,
138 /// The protocol request id (streaming correlation).
139 pub request_id: &'a str,
140 /// The inbound user message.
141 pub user_message: &'a str,
142 /// **The requester's document-level entitlements.** Retrieval (the
143 /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
144 /// tool) reads through `storage.knowledge_for_access(&access)`, so a
145 /// restricted document is never surfaced to a requester who lacks the
146 /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
147 /// (fail closed for ACL'd content).
148 pub access: AccessContext,
149 /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
150 /// deterministically offline. `None` in production (a live client is built
151 /// from `llm`).
152 pub llm_provider: Option<Arc<dyn LlmProvider>>,
153 /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
154 /// `knowledge_search` tool overfetches candidates and reorders the top-K with
155 /// this reranker before they reach the model. `None` (the default) keeps the
156 /// retrieval order unchanged, so default behavior is byte-for-byte the same.
157 /// Selected by [`build_reranker`](crate::reranker::build_reranker).
158 pub reranker: Option<Arc<dyn Reranker>>,
159 /// Optional **write-confirmation HITL** wiring. `None` (the default) installs
160 /// no confirmation hook, so no tool ever parks the turn and behavior is
161 /// identical to before HITL. `Some` installs a core [`ConfirmationHook`] over
162 /// the configured tool patterns and bridges its [`HumanRequest`]s to a
163 /// `confirm_tool_action_required` event + a registered resumable sender. See
164 /// [`ConfirmationConfig`].
165 pub confirmation: Option<ConfirmationConfig>,
166 /// **SEAM 1 — host tool injection.** When `Some`, the runner asks this
167 /// provider for EXTRA tools and merges them into the turn's
168 /// [`ToolRegistry`] alongside the built-ins. `None` (the default) leaves the
169 /// registry as exactly the built-ins, so default behavior is byte-for-byte
170 /// unchanged. A host installs one via [`AppState::with_tools`](crate::state::AppState::with_tools).
171 pub tool_provider: Option<Arc<dyn ToolProvider>>,
172 /// **SEAM 2 — per-org agent persona.** The resolved system prompt for this
173 /// turn. When `Some`, it REPLACES the built-in [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`]
174 /// as the agent's system prompt (the host resolves it from per-org settings,
175 /// e.g. [`AgentSettings::persona`](smooth_operator::settings::AgentSettings::persona)).
176 /// `None` (the default) keeps the const prompt, so default behavior is
177 /// byte-for-byte unchanged.
178 pub system_prompt: Option<String>,
179 /// The owning org for this turn, threaded into the
180 /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
181 /// so a [`ToolProvider`] can return per-org tools. `None` when no org is
182 /// resolved (e.g. an anonymous reference-server connection).
183 pub org_id: Option<String>,
184}
185
186/// Runs one knowledge-grounded, streaming turn for a session's conversation and
187/// emits protocol-shaped events through `sink` as they happen.
188///
189/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
190/// [`crate::protocol`]). The caller forwards them over the WebSocket.
191///
192/// ## Access control (security-critical)
193///
194/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
195/// context and the agent's `knowledge_search` tool — read through
196/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
197/// so a document the requester is not entitled to (e.g. a private-repo doc
198/// scoped to a group the requester is not in) is dropped before it can reach the
199/// model or a citation. See `docs/ACCESS-CONTROL.md`.
200///
201/// # Errors
202/// Returns an error if message persistence or the agent loop fails fatally. The
203/// caller converts this into a protocol `error` event.
204pub async fn run_streaming_turn(
205 req: TurnRequest<'_>,
206 sink: &UnboundedSender<serde_json::Value>,
207) -> Result<TurnResult> {
208 let TurnRequest {
209 storage,
210 llm,
211 max_iterations,
212 conversation_id,
213 request_id,
214 user_message,
215 access,
216 llm_provider,
217 reranker,
218 confirmation,
219 tool_provider,
220 system_prompt,
221 org_id,
222 } = req;
223
224 // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
225 // Built once from the requester's `AccessContext` so the auto-injected
226 // context query, the agent's `knowledge_search` tool, and the citation
227 // mirror all hit the SAME filtered view — a restricted doc can't leak in
228 // through one path while being dropped on another.
229 let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);
230
231 // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
232 // citations include the sources the FIRST LLM call was grounded with.
233 // Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
234 // against the same ACL-filtered handle. Best-effort: a KB error yields no
235 // auto-context citations.
236 let auto_sources: Vec<KnowledgeResult> = knowledge
237 .query(user_message, AUTO_CONTEXT_LIMIT)
238 .unwrap_or_default();
239 // Sink the knowledge_search tool records its structured results into, for
240 // citations built from the sources the agent's searches surfaced.
241 let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
242
243 // 1. Load prior turns for memory BEFORE persisting the new inbound message,
244 // so prior_messages is exactly the history-up-to-now.
245 let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;
246
247 // 2. Persist the inbound user message.
248 persist_message(
249 storage.as_ref(),
250 conversation_id,
251 Direction::Inbound,
252 user_message,
253 )
254 .await?;
255
256 // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
257 // SAME ACL-filtered handle) + replayed prior messages for memory.
258 //
259 // SEAM 2 — resolve the system prompt: a host-supplied per-org persona
260 // (`system_prompt`) overrides the built-in const; absent ⇒ the const, so
261 // default behavior is byte-for-byte unchanged.
262 let resolved_prompt = system_prompt
263 .as_deref()
264 .unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
265 let config = AgentConfig::new("smooth-agent-chat", resolved_prompt, llm)
266 .with_max_iterations(max_iterations)
267 .with_knowledge(Arc::clone(&knowledge))
268 .with_prior_messages(prior);
269
270 let mut tools = ToolRegistry::new();
271 // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
272 // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
273 // stage. With `None` (the default) the tool fetches exactly `limit` and
274 // returns the retrieval order unchanged.
275 let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
276 .with_result_sink(Arc::clone(&tool_sources));
277 if let Some(reranker) = reranker {
278 knowledge_search = knowledge_search.with_reranker(reranker);
279 }
280 tools.register(knowledge_search);
281
282 // SEAM 1 — merge host-contributed tools. When a provider is installed, ask
283 // it (with the turn's org + access context) for extra tools and register
284 // each alongside the built-ins. Built-ins are registered FIRST, so a host
285 // tool that intentionally reuses a built-in name replaces it; a distinct
286 // name simply adds. With no provider this block is a no-op, leaving the
287 // registry as exactly today's built-ins.
288 if let Some(provider) = tool_provider {
289 let ctx = ToolProviderContext::new(org_id, access.clone());
290 for tool in provider.tools_for(&ctx).await {
291 tools.register_arc(tool);
292 }
293 }
294
295 // 3a. Write-confirmation HITL: when configured with tool patterns, install a
296 // core `ConfirmationHook` over those tools and spawn a bridge that turns
297 // each `HumanRequest::Confirm` into a `confirm_tool_action_required`
298 // event + a registered resumable `HumanResponse` sender. With no
299 // `confirmation` (the default) or empty patterns, no hook is installed —
300 // no tool parks the turn, byte-for-byte unchanged from before HITL.
301 let confirmation_bridge = match &confirmation {
302 Some(cfg) if !cfg.tool_patterns.is_empty() => {
303 let pair = human_channel();
304 // The hook owns the request *sender* (emits Confirm) and the response
305 // *receiver* (awaits the human's verdict). The runner keeps the
306 // request *receiver* and the response *sender* for the bridge.
307 tools.add_hook(ConfirmationHook::new(
308 cfg.tool_patterns.clone(),
309 pair.request_tx,
310 pair.response_rx,
311 CONFIRMATION_TIMEOUT,
312 ));
313 Some(spawn_confirmation_bridge(
314 pair.request_rx,
315 pair.response_tx,
316 sink.clone(),
317 request_id.to_string(),
318 cfg.session_id.clone(),
319 Arc::clone(&cfg.register),
320 ))
321 }
322 _ => None,
323 };
324
325 let agent = {
326 let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
327 // Inject the mock LLM provider for offline/deterministic tests; in
328 // production a live client is built from `llm`.
329 match llm_provider {
330 Some(provider) => agent.with_llm_provider(provider),
331 None => agent,
332 }
333 };
334
335 // 4. Run with the streaming channel and translate events as they arrive.
336 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
337 let request_id_owned = request_id.to_string();
338 let sink_clone = sink.clone();
339
340 // Spawn the event translator so we forward tokens to the client in real
341 // time while the agent loop runs concurrently.
342 let translator = tokio::spawn(async move {
343 let mut invoked_knowledge_search = false;
344 while let Some(event) = rx.recv().await {
345 match event {
346 AgentEvent::TokenDelta { content } => {
347 if !content.is_empty() {
348 let _ = sink_clone
349 .send(crate::protocol::stream_token(&request_id_owned, &content));
350 }
351 }
352 AgentEvent::ToolCallStart {
353 tool_name,
354 arguments,
355 ..
356 } => {
357 if tool_name == "knowledge_search" {
358 invoked_knowledge_search = true;
359 }
360 let _ = sink_clone.send(crate::protocol::stream_chunk(
361 &request_id_owned,
362 &tool_name,
363 json!({
364 "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
365 }),
366 ));
367 }
368 AgentEvent::ToolCallComplete {
369 tool_name,
370 result,
371 is_error,
372 ..
373 } => {
374 let _ = sink_clone.send(crate::protocol::stream_chunk(
375 &request_id_owned,
376 &tool_name,
377 json!({
378 "rawResponse": json!({
379 "toolResult": { "name": tool_name, "isError": is_error, "result": result }
380 }),
381 }),
382 ));
383 }
384 AgentEvent::PhaseStart { phase, .. } => {
385 let _ = sink_clone.send(crate::protocol::stream_chunk(
386 &request_id_owned,
387 &phase,
388 json!({}),
389 ));
390 }
391 // Started / Completed / token-accounting events are terminal or
392 // structural; the protocol carries those via immediate/eventual
393 // responses, so they're intentionally not re-emitted here.
394 _ => {}
395 }
396 }
397 invoked_knowledge_search
398 });
399
400 // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
401 // the channel closes and the translator task drains and finishes.
402 let conversation = agent.run_with_channel(user_message, tx).await?;
403
404 // The turn is over: tear down the confirmation bridge. `run_with_channel`
405 // borrows `&self`, so the agent (and the `ConfirmationHook` it owns via the
406 // tool registry) is STILL alive here — and the hook holds the bridge's
407 // request *sender*. Dropping the agent closes that sender, which is what
408 // lets the bridge's `request_rx.recv()` return `None` and the task finish.
409 // Without this explicit drop, awaiting the bridge below would hang forever.
410 drop(agent);
411 if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
412 let _ = handle.await;
413 (cfg.clear)(&cfg.session_id);
414 }
415
416 let invoked_knowledge_search = translator.await.unwrap_or(false);
417
418 let reply = conversation
419 .last_assistant_content()
420 .unwrap_or_default()
421 .to_string();
422
423 // 5. Persist the outbound reply and capture its id for eventual_response.
424 let message_id = if reply.is_empty() {
425 uuid::Uuid::new_v4().to_string()
426 } else {
427 persist_message(
428 storage.as_ref(),
429 conversation_id,
430 Direction::Outbound,
431 &reply,
432 )
433 .await?
434 .id
435 };
436
437 // Build citations from the sources that grounded this turn: auto-injected
438 // context first (it grounded the first LLM call), then the agent's
439 // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
440 let tool_sources = match Arc::try_unwrap(tool_sources) {
441 Ok(mutex) => mutex
442 .into_inner()
443 .unwrap_or_else(std::sync::PoisonError::into_inner),
444 Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
445 };
446 let citations = collect_citations(&auto_sources, &tool_sources);
447
448 Ok(TurnResult {
449 reply,
450 message_id,
451 invoked_knowledge_search,
452 citations,
453 })
454}
455
456/// Spawn the **confirmation bridge** for a turn that has a `ConfirmationHook`
457/// installed. The bridge owns the request *receiver* (each item is a
458/// [`HumanRequest::Confirm`] the hook emitted when a write tool is about to run)
459/// and the response *sender* (the hook awaits the verdict on its paired
460/// receiver). For every confirm request it:
461/// 1. registers `response_tx` under `session_id` via `register`, so an inbound
462/// `confirm_tool_action` can take it and feed the verdict back, and
463/// 2. emits a `write_confirmation_required` event through the turn `sink`,
464/// parking the turn until the client confirms.
465///
466/// The `tool_name` is used as the event's opaque `toolId`: core's
467/// `HumanRequest::Confirm` doesn't carry the LLM's tool-call id, but a turn only
468/// parks one write tool at a time (the loop blocks inside `pre_call`), so the
469/// tool name is a stable, sufficient correlation key for the resume. The bridge
470/// loops until the request channel closes (the hook/agent dropped at turn end),
471/// then returns — letting the caller clear the registration.
472fn spawn_confirmation_bridge(
473 mut request_rx: UnboundedReceiver<HumanRequest>,
474 response_tx: UnboundedSender<HumanResponse>,
475 sink: UnboundedSender<serde_json::Value>,
476 request_id: String,
477 session_id: String,
478 register: RegisterConfirmation,
479) -> tokio::task::JoinHandle<()> {
480 tokio::spawn(async move {
481 while let Some(req) = request_rx.recv().await {
482 match req {
483 HumanRequest::Confirm {
484 tool_name, prompt, ..
485 } => {
486 // Register THIS turn's response sender so the next
487 // `confirm_tool_action` for this session resumes it. Re-clone
488 // per request: the hook takes one verdict per parked tool.
489 register(&session_id, response_tx.clone());
490 // Per spec the event carries a `requestId` (correlation), an
491 // opaque `toolId` (the tool name — one tool parks at a time),
492 // and the human-readable `actionDescription` (the hook prompt).
493 let _ = sink.send(crate::protocol::write_confirmation_required(
494 &request_id,
495 &tool_name,
496 &prompt,
497 ));
498 }
499 // The chat HITL path only emits `Confirm`; a free-form `Input`
500 // request has no chat affordance, so auto-decline it rather than
501 // hang the turn (keeps the loop live for the next confirm).
502 HumanRequest::Input { .. } => {
503 let _ = response_tx.send(HumanResponse::Denied {
504 reason: "free-form human input is not supported on this channel".into(),
505 });
506 }
507 }
508 }
509 })
510}
511
512/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
513/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
514/// by the runner) followed by everything the agent's `knowledge_search` calls
515/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
516/// (first occurrence wins), mapped to [`Citation`], and capped at
517/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
518fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
519 let mut seen = std::collections::HashSet::new();
520 auto.iter()
521 .chain(tool.iter())
522 .filter(|r| seen.insert(r.document_id.clone()))
523 .take(MAX_CITATIONS)
524 .map(Citation::from_knowledge_result)
525 .collect()
526}
527
528/// Load the conversation's persisted messages (oldest-first, capped) and convert
529/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
530async fn load_prior_messages(
531 storage: &dyn StorageAdapter,
532 conversation_id: &str,
533) -> Result<Vec<EngineMessage>> {
534 let page = storage
535 .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
536 .await?;
537
538 let mut out = Vec::with_capacity(page.messages.len());
539 for m in page.messages {
540 let text = m
541 .content
542 .text
543 .clone()
544 .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
545 .unwrap_or_default();
546 if text.is_empty() {
547 continue;
548 }
549 let role = match m.direction {
550 Direction::Inbound => Role::User,
551 Direction::Outbound => Role::Assistant,
552 };
553 out.push(EngineMessage {
554 id: m.id,
555 role,
556 content: text,
557 tool_call_id: None,
558 tool_name: None,
559 tool_calls: vec![],
560 reasoning_content: None,
561 timestamp: m.created_at,
562 });
563 }
564 Ok(out)
565}
566
567/// Append a single message to the conversation's log via the adapter.
568async fn persist_message(
569 storage: &dyn StorageAdapter,
570 conversation_id: &str,
571 direction: Direction,
572 text: &str,
573) -> Result<DomainMessage> {
574 let now = chrono::Utc::now();
575 let message = DomainMessage {
576 id: uuid::Uuid::new_v4().to_string(),
577 external_id: None,
578 organization_id: None,
579 conversation_id: Some(conversation_id.to_string()),
580 direction,
581 content: MessageContent::from_text(text),
582 from: None,
583 to: None,
584 metadata_json: None,
585 analytics_json: None,
586 created_at: now,
587 updated_at: None,
588 };
589 storage.append_message(message).await
590}
591
592/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
593/// `eventual_response` carries. The reference runtime doesn't produce the full
594/// structured analytics, so we surface the reply text in `responseParts` and
595/// supply neutral defaults for the analytic fields (clients render
596/// `responseParts`).
597#[must_use]
598pub fn general_agent_response(reply: &str) -> serde_json::Value {
599 json!({
600 "responseParts": [reply],
601 "customerHappinessScore": 0.5,
602 "needsSatisfactionScore": 0.5,
603 "requestSummary": "",
604 "resolutionStatus": "in_progress",
605 "suggestedNextActions": [],
606 })
607}