smooai-smooth-operator-server 1.6.0

Reference WebSocket service for smooth-operator — speaks the schema-driven protocol over a smooth-operator KnowledgeChatRuntime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
//! The streaming, memory-carrying agent runner used by the WS service.
//!
//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
//! path but (a) is non-streaming (`run_turn` returns only after the turn
//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
//!
//! The service needs both, so this module builds the agent itself, wiring the
//! same knowledge-grounding as core PLUS:
//!
//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
//!    [`AgentEvent`] stream into protocol events (`stream_token`,
//!    `stream_chunk`, `eventual_response`).
//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
//!    each turn the session's persisted message log is loaded from the storage
//!    adapter and replayed into the conversation, so the model sees turn 1 when
//!    answering turn 2. (`Agent::new` randomizes the agent id every time, so the
//!    checkpoint-resume path can't be keyed stably — replaying the persisted log
//!    is the robust, backend-agnostic way to carry memory. The log is the source
//!    of truth the adapter already persists.)

use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::Result;
use serde_json::json;
use smooth_operator_core::llm_provider::LlmProvider;
use smooth_operator_core::{
    human_channel, Agent, AgentConfig, AgentEvent, ConfirmationHook, HumanRequest, HumanResponse,
    KnowledgeBase, KnowledgeResult, LlmConfig, Message as EngineMessage, Role, ToolRegistry,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

use smooth_operator::access_control::AccessContext;
use smooth_operator::adapter::{MessageQuery, StorageAdapter};
use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
use smooth_operator::rerank::Reranker;
use smooth_operator::tool_provider::{ToolProvider, ToolProviderContext};
use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
use smooth_operator::MAX_CITATIONS;

/// How many auto-injected knowledge results the engine prepends as
/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
/// auto-injection (a top-3 query) so the citations we collect match the sources
/// that grounded the first LLM call.
const AUTO_CONTEXT_LIMIT: usize = 3;

/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
/// answers in the knowledge base and search it before answering anything
/// organization-specific.
const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
    "You are a helpful customer-support agent for the organization. \
    Answer the user's question accurately and concisely. When a question depends on \
    organization-specific facts (policies, products, documentation), call the \
    `knowledge_search` tool to retrieve them before answering, and ground your answer \
    in what you retrieve. If the knowledge base has no relevant information, say so. \
    Remember facts the user tells you within the conversation and use them when asked.";

/// Max prior turns to replay into the conversation for memory. Bounds context
/// growth on long sessions; the in-memory log is small but a real backend could
/// be large.
const MAX_PRIOR_MESSAGES: usize = 50;

/// How long a parked write-tool confirmation waits for a `confirm_tool_action`
/// before the core `ConfirmationHook` gives up and treats the tool as denied
/// (a timeout). Bounds a stuck turn so a client that never confirms can't pin a
/// task forever. Generous (5 min) because a human is in the loop.
const CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(300);

/// Registers a parked turn's [`HumanResponse`] sender under a session id (so a
/// later `confirm_tool_action` can take it). Typically `AppState::register_confirmation`.
pub type RegisterConfirmation = Arc<dyn Fn(&str, UnboundedSender<HumanResponse>) + Send + Sync>;

/// Clears any registered confirmation sender for a session id when its turn ends.
/// Typically `AppState::clear_confirmation`.
pub type ClearConfirmation = Arc<dyn Fn(&str) + Send + Sync>;

/// Hooks the runner needs to wire **write-confirmation HITL** into a turn
/// without depending on `AppState` directly (keeps the runner unit-testable).
///
/// When `Some`, the runner installs a core [`ConfirmationHook`] over every tool
/// whose name matches one of [`tool_patterns`](Self::tool_patterns). When such a
/// tool is about to run, the agent loop **parks** inside the hook's `pre_call`
/// and emits a [`HumanRequest::Confirm`]; the runner's bridge:
///   1. calls [`register`](Self::register) with the session's
///      [`HumanResponse`] sender, so a later `confirm_tool_action` can resume,
///   2. emits a `confirm_tool_action_required` event through the turn sink.
///
/// On `confirm_tool_action`, the handler feeds the sender [`HumanResponse`] and
/// the parked tool either executes (approved) or is skipped with a rejection
/// result (denied). `None` (the default) installs no hook → no tool ever parks →
/// behavior is byte-for-byte identical to before HITL.
pub struct ConfirmationConfig {
    /// Tool-name substrings that require human approval (matched by core's
    /// `ConfirmationHook`, which uses `contains` matching). Empty disables HITL.
    pub tool_patterns: Vec<String>,
    /// The session this turn belongs to — carried on the
    /// `confirm_tool_action_required` event and the registration key so the
    /// inbound `confirm_tool_action` (keyed by `sessionId`) routes back here.
    pub session_id: String,
    /// Registers the parked turn's [`HumanResponse`] sender under
    /// [`session_id`](Self::session_id) (typically `AppState::register_confirmation`).
    pub register: RegisterConfirmation,
    /// Clears any registered sender for [`session_id`](Self::session_id) when the
    /// turn ends (typically `AppState::clear_confirmation`), so a stale sender
    /// can't mis-route a later confirmation.
    pub clear: ClearConfirmation,
}

/// The terminal outcome of a streamed turn.
pub struct TurnResult {
    /// The agent's final natural-language reply.
    pub reply: String,
    /// The id of the persisted outbound (agent) message, for `eventual_response`.
    pub message_id: String,
    /// True if any `knowledge_search` tool call ran this turn (diagnostics).
    pub invoked_knowledge_search: bool,
    /// The sources that grounded this turn (the auto-injected context + every
    /// `knowledge_search` result), deduped by id and capped. Carried onto the
    /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
    pub citations: Vec<Citation>,
}

/// Everything one streaming turn needs. Bundled into a struct so the call sites
/// (the reference server's `handle_send_message` and the lambda's
/// `send_message`) stay readable and the security-critical [`access`](Self::access)
/// field can never be silently dropped from a positional argument list.
pub struct TurnRequest<'a> {
    /// The storage seam (conversations / messages / sessions / knowledge).
    pub storage: Arc<dyn StorageAdapter>,
    /// The resolved LLM config for this turn.
    pub llm: LlmConfig,
    /// Agent-loop iteration cap.
    pub max_iterations: u32,
    /// The conversation this turn belongs to.
    pub conversation_id: &'a str,
    /// The protocol request id (streaming correlation).
    pub request_id: &'a str,
    /// The inbound user message.
    pub user_message: &'a str,
    /// **The requester's document-level entitlements.** Retrieval (the
    /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
    /// tool) reads through `storage.knowledge_for_access(&access)`, so a
    /// restricted document is never surfaced to a requester who lacks the
    /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
    /// (fail closed for ACL'd content).
    pub access: AccessContext,
    /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
    /// deterministically offline. `None` in production (a live client is built
    /// from `llm`).
    pub llm_provider: Option<Arc<dyn LlmProvider>>,
    /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
    /// `knowledge_search` tool overfetches candidates and reorders the top-K with
    /// this reranker before they reach the model. `None` (the default) keeps the
    /// retrieval order unchanged, so default behavior is byte-for-byte the same.
    /// Selected by [`build_reranker`](crate::reranker::build_reranker).
    pub reranker: Option<Arc<dyn Reranker>>,
    /// Optional **write-confirmation HITL** wiring. `None` (the default) installs
    /// no confirmation hook, so no tool ever parks the turn and behavior is
    /// identical to before HITL. `Some` installs a core [`ConfirmationHook`] over
    /// the configured tool patterns and bridges its [`HumanRequest`]s to a
    /// `confirm_tool_action_required` event + a registered resumable sender. See
    /// [`ConfirmationConfig`].
    pub confirmation: Option<ConfirmationConfig>,
    /// **SEAM 1 — host tool injection.** When `Some`, the runner asks this
    /// provider for EXTRA tools and merges them into the turn's
    /// [`ToolRegistry`] alongside the built-ins. `None` (the default) leaves the
    /// registry as exactly the built-ins, so default behavior is byte-for-byte
    /// unchanged. A host installs one via [`AppState::with_tools`](crate::state::AppState::with_tools).
    pub tool_provider: Option<Arc<dyn ToolProvider>>,
    /// **SEAM 2 — per-org agent persona.** The resolved system prompt for this
    /// turn. When `Some`, it REPLACES the built-in [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`]
    /// as the agent's system prompt (the host resolves it from per-org settings,
    /// e.g. [`AgentSettings::persona`](smooth_operator::settings::AgentSettings::persona)).
    /// `None` (the default) keeps the const prompt, so default behavior is
    /// byte-for-byte unchanged.
    pub system_prompt: Option<String>,
    /// The owning org for this turn, threaded into the
    /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
    /// so a [`ToolProvider`] can return per-org tools. `None` when no org is
    /// resolved (e.g. an anonymous reference-server connection).
    pub org_id: Option<String>,
    /// The resolved per-org LLM-gateway key for this turn, threaded into the
    /// [`ToolProviderContext`](smooth_operator::tool_provider::ToolProviderContext)
    /// so a retrieval-style host tool (e.g. agent-brain's `knowledge_search`)
    /// can call the same gateway this turn was billed/scoped to. `None` when no
    /// key resolved (e.g. a mock-driven offline turn). The runner does not use
    /// it to talk to the gateway itself — that comes from [`llm`](Self::llm); it
    /// only carries it through to the provider context.
    pub gateway_key: Option<String>,
}

/// Runs one knowledge-grounded, streaming turn for a session's conversation and
/// emits protocol-shaped events through `sink` as they happen.
///
/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
/// [`crate::protocol`]). The caller forwards them over the WebSocket.
///
/// ## Access control (security-critical)
///
/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
/// context and the agent's `knowledge_search` tool — read through
/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
/// so a document the requester is not entitled to (e.g. a private-repo doc
/// scoped to a group the requester is not in) is dropped before it can reach the
/// model or a citation. See `docs/ACCESS-CONTROL.md`.
///
/// # Errors
/// Returns an error if message persistence or the agent loop fails fatally. The
/// caller converts this into a protocol `error` event.
pub async fn run_streaming_turn(
    req: TurnRequest<'_>,
    sink: &UnboundedSender<serde_json::Value>,
) -> Result<TurnResult> {
    let TurnRequest {
        storage,
        llm,
        max_iterations,
        conversation_id,
        request_id,
        user_message,
        access,
        llm_provider,
        reranker,
        confirmation,
        tool_provider,
        system_prompt,
        org_id,
        gateway_key,
    } = req;

    // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
    // Built once from the requester's `AccessContext` so the auto-injected
    // context query, the agent's `knowledge_search` tool, and the citation
    // mirror all hit the SAME filtered view — a restricted doc can't leak in
    // through one path while being dropped on another.
    let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);

    // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
    //    citations include the sources the FIRST LLM call was grounded with.
    //    Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
    //    against the same ACL-filtered handle. Best-effort: a KB error yields no
    //    auto-context citations.
    let auto_sources: Vec<KnowledgeResult> = knowledge
        .query(user_message, AUTO_CONTEXT_LIMIT)
        .unwrap_or_default();
    // Sink the knowledge_search tool records its structured results into, for
    // citations built from the sources the agent's searches surfaced.
    let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));

    // 1. Load prior turns for memory BEFORE persisting the new inbound message,
    //    so prior_messages is exactly the history-up-to-now.
    let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;

    // 2. Persist the inbound user message.
    persist_message(
        storage.as_ref(),
        conversation_id,
        Direction::Inbound,
        user_message,
    )
    .await?;

    // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
    //    SAME ACL-filtered handle) + replayed prior messages for memory.
    //
    //    SEAM 2 — resolve the system prompt: a host-supplied per-org persona
    //    (`system_prompt`) overrides the built-in const; absent ⇒ the const, so
    //    default behavior is byte-for-byte unchanged.
    let resolved_prompt = system_prompt
        .as_deref()
        .unwrap_or(KNOWLEDGE_CHAT_SYSTEM_PROMPT);
    let config = AgentConfig::new("smooth-agent-chat", resolved_prompt, llm)
        .with_max_iterations(max_iterations)
        .with_knowledge(Arc::clone(&knowledge))
        .with_prior_messages(prior);

    let mut tools = ToolRegistry::new();
    // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
    // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
    // stage. With `None` (the default) the tool fetches exactly `limit` and
    // returns the retrieval order unchanged.
    let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
        .with_result_sink(Arc::clone(&tool_sources));
    if let Some(reranker) = reranker {
        knowledge_search = knowledge_search.with_reranker(reranker);
    }
    tools.register(knowledge_search);

    // SEAM 1 — merge host-contributed tools. When a provider is installed, ask
    // it (with the turn's org + access context) for extra tools and register
    // each alongside the built-ins. Built-ins are registered FIRST, so a host
    // tool that intentionally reuses a built-in name replaces it; a distinct
    // name simply adds. With no provider this block is a no-op, leaving the
    // registry as exactly today's built-ins.
    if let Some(provider) = tool_provider {
        // Thread the per-turn handles the runner already has — the conversation
        // this turn runs in and the resolved per-org gateway key — so a host's
        // conversation-persisting / retrieval tools aren't degraded to no-ops.
        let mut ctx =
            ToolProviderContext::new(org_id, access.clone()).with_conversation_id(conversation_id);
        if let Some(key) = gateway_key {
            ctx = ctx.with_gateway_key(key);
        }
        for tool in provider.tools_for(&ctx).await {
            tools.register_arc(tool);
        }
    }

    // 3a. Write-confirmation HITL: when configured with tool patterns, install a
    //     core `ConfirmationHook` over those tools and spawn a bridge that turns
    //     each `HumanRequest::Confirm` into a `confirm_tool_action_required`
    //     event + a registered resumable `HumanResponse` sender. With no
    //     `confirmation` (the default) or empty patterns, no hook is installed —
    //     no tool parks the turn, byte-for-byte unchanged from before HITL.
    let confirmation_bridge = match &confirmation {
        Some(cfg) if !cfg.tool_patterns.is_empty() => {
            let pair = human_channel();
            // The hook owns the request *sender* (emits Confirm) and the response
            // *receiver* (awaits the human's verdict). The runner keeps the
            // request *receiver* and the response *sender* for the bridge.
            tools.add_hook(ConfirmationHook::new(
                cfg.tool_patterns.clone(),
                pair.request_tx,
                pair.response_rx,
                CONFIRMATION_TIMEOUT,
            ));
            Some(spawn_confirmation_bridge(
                pair.request_rx,
                pair.response_tx,
                sink.clone(),
                request_id.to_string(),
                cfg.session_id.clone(),
                Arc::clone(&cfg.register),
            ))
        }
        _ => None,
    };

    let agent = {
        let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
        // Inject the mock LLM provider for offline/deterministic tests; in
        // production a live client is built from `llm`.
        match llm_provider {
            Some(provider) => agent.with_llm_provider(provider),
            None => agent,
        }
    };

    // 4. Run with the streaming channel and translate events as they arrive.
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
    let request_id_owned = request_id.to_string();
    let sink_clone = sink.clone();

    // Spawn the event translator so we forward tokens to the client in real
    // time while the agent loop runs concurrently.
    let translator = tokio::spawn(async move {
        let mut invoked_knowledge_search = false;
        while let Some(event) = rx.recv().await {
            match event {
                AgentEvent::TokenDelta { content } => {
                    if !content.is_empty() {
                        let _ = sink_clone
                            .send(crate::protocol::stream_token(&request_id_owned, &content));
                    }
                }
                AgentEvent::ToolCallStart {
                    tool_name,
                    arguments,
                    ..
                } => {
                    if tool_name == "knowledge_search" {
                        invoked_knowledge_search = true;
                    }
                    let _ = sink_clone.send(crate::protocol::stream_chunk(
                        &request_id_owned,
                        &tool_name,
                        json!({
                            "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
                        }),
                    ));
                }
                AgentEvent::ToolCallComplete {
                    tool_name,
                    result,
                    is_error,
                    ..
                } => {
                    let _ = sink_clone.send(crate::protocol::stream_chunk(
                        &request_id_owned,
                        &tool_name,
                        json!({
                            "rawResponse": json!({
                                "toolResult": { "name": tool_name, "isError": is_error, "result": result }
                            }),
                        }),
                    ));
                }
                AgentEvent::PhaseStart { phase, .. } => {
                    let _ = sink_clone.send(crate::protocol::stream_chunk(
                        &request_id_owned,
                        &phase,
                        json!({}),
                    ));
                }
                // Started / Completed / token-accounting events are terminal or
                // structural; the protocol carries those via immediate/eventual
                // responses, so they're intentionally not re-emitted here.
                _ => {}
            }
        }
        invoked_knowledge_search
    });

    // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
    // the channel closes and the translator task drains and finishes.
    let conversation = agent.run_with_channel(user_message, tx).await?;

    // The turn is over: tear down the confirmation bridge. `run_with_channel`
    // borrows `&self`, so the agent (and the `ConfirmationHook` it owns via the
    // tool registry) is STILL alive here — and the hook holds the bridge's
    // request *sender*. Dropping the agent closes that sender, which is what
    // lets the bridge's `request_rx.recv()` return `None` and the task finish.
    // Without this explicit drop, awaiting the bridge below would hang forever.
    drop(agent);
    if let (Some(handle), Some(cfg)) = (confirmation_bridge, confirmation.as_ref()) {
        let _ = handle.await;
        (cfg.clear)(&cfg.session_id);
    }

    let invoked_knowledge_search = translator.await.unwrap_or(false);

    let reply = conversation
        .last_assistant_content()
        .unwrap_or_default()
        .to_string();

    // 5. Persist the outbound reply and capture its id for eventual_response.
    let message_id = if reply.is_empty() {
        uuid::Uuid::new_v4().to_string()
    } else {
        persist_message(
            storage.as_ref(),
            conversation_id,
            Direction::Outbound,
            &reply,
        )
        .await?
        .id
    };

    // Build citations from the sources that grounded this turn: auto-injected
    // context first (it grounded the first LLM call), then the agent's
    // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
    let tool_sources = match Arc::try_unwrap(tool_sources) {
        Ok(mutex) => mutex
            .into_inner()
            .unwrap_or_else(std::sync::PoisonError::into_inner),
        Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
    };
    let citations = collect_citations(&auto_sources, &tool_sources);

    Ok(TurnResult {
        reply,
        message_id,
        invoked_knowledge_search,
        citations,
    })
}

/// Spawn the **confirmation bridge** for a turn that has a `ConfirmationHook`
/// installed. The bridge owns the request *receiver* (each item is a
/// [`HumanRequest::Confirm`] the hook emitted when a write tool is about to run)
/// and the response *sender* (the hook awaits the verdict on its paired
/// receiver). For every confirm request it:
///   1. registers `response_tx` under `session_id` via `register`, so an inbound
///      `confirm_tool_action` can take it and feed the verdict back, and
///   2. emits a `write_confirmation_required` event through the turn `sink`,
///      parking the turn until the client confirms.
///
/// The `tool_name` is used as the event's opaque `toolId`: core's
/// `HumanRequest::Confirm` doesn't carry the LLM's tool-call id, but a turn only
/// parks one write tool at a time (the loop blocks inside `pre_call`), so the
/// tool name is a stable, sufficient correlation key for the resume. The bridge
/// loops until the request channel closes (the hook/agent dropped at turn end),
/// then returns — letting the caller clear the registration.
fn spawn_confirmation_bridge(
    mut request_rx: UnboundedReceiver<HumanRequest>,
    response_tx: UnboundedSender<HumanResponse>,
    sink: UnboundedSender<serde_json::Value>,
    request_id: String,
    session_id: String,
    register: RegisterConfirmation,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        while let Some(req) = request_rx.recv().await {
            match req {
                HumanRequest::Confirm {
                    tool_name, prompt, ..
                } => {
                    // Register THIS turn's response sender so the next
                    // `confirm_tool_action` for this session resumes it. Re-clone
                    // per request: the hook takes one verdict per parked tool.
                    register(&session_id, response_tx.clone());
                    // Per spec the event carries a `requestId` (correlation), an
                    // opaque `toolId` (the tool name — one tool parks at a time),
                    // and the human-readable `actionDescription` (the hook prompt).
                    let _ = sink.send(crate::protocol::write_confirmation_required(
                        &request_id,
                        &tool_name,
                        &prompt,
                    ));
                }
                // The chat HITL path only emits `Confirm`; a free-form `Input`
                // request has no chat affordance, so auto-decline it rather than
                // hang the turn (keeps the loop live for the next confirm).
                HumanRequest::Input { .. } => {
                    let _ = response_tx.send(HumanResponse::Denied {
                        reason: "free-form human input is not supported on this channel".into(),
                    });
                }
            }
        }
    })
}

/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
/// by the runner) followed by everything the agent's `knowledge_search` calls
/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
/// (first occurrence wins), mapped to [`Citation`], and capped at
/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
    let mut seen = std::collections::HashSet::new();
    auto.iter()
        .chain(tool.iter())
        .filter(|r| seen.insert(r.document_id.clone()))
        .take(MAX_CITATIONS)
        .map(Citation::from_knowledge_result)
        .collect()
}

/// Load the conversation's persisted messages (oldest-first, capped) and convert
/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
async fn load_prior_messages(
    storage: &dyn StorageAdapter,
    conversation_id: &str,
) -> Result<Vec<EngineMessage>> {
    let page = storage
        .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
        .await?;

    let mut out = Vec::with_capacity(page.messages.len());
    for m in page.messages {
        let text = m
            .content
            .text
            .clone()
            .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
            .unwrap_or_default();
        if text.is_empty() {
            continue;
        }
        let role = match m.direction {
            Direction::Inbound => Role::User,
            Direction::Outbound => Role::Assistant,
        };
        out.push(EngineMessage {
            id: m.id,
            role,
            content: text,
            tool_call_id: None,
            tool_name: None,
            tool_calls: vec![],
            reasoning_content: None,
            timestamp: m.created_at,
        });
    }
    Ok(out)
}

/// Append a single message to the conversation's log via the adapter.
async fn persist_message(
    storage: &dyn StorageAdapter,
    conversation_id: &str,
    direction: Direction,
    text: &str,
) -> Result<DomainMessage> {
    let now = chrono::Utc::now();
    let message = DomainMessage {
        id: uuid::Uuid::new_v4().to_string(),
        external_id: None,
        organization_id: None,
        conversation_id: Some(conversation_id.to_string()),
        direction,
        content: MessageContent::from_text(text),
        from: None,
        to: None,
        metadata_json: None,
        analytics_json: None,
        created_at: now,
        updated_at: None,
    };
    storage.append_message(message).await
}

/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
/// `eventual_response` carries. The reference runtime doesn't produce the full
/// structured analytics, so we surface the reply text in `responseParts` and
/// supply neutral defaults for the analytic fields (clients render
/// `responseParts`).
#[must_use]
pub fn general_agent_response(reply: &str) -> serde_json::Value {
    json!({
        "responseParts": [reply],
        "customerHappinessScore": 0.5,
        "needsSatisfactionScore": 0.5,
        "requestSummary": "",
        "resolutionStatus": "in_progress",
        "suggestedNextActions": [],
    })
}