Skip to main content

smooth_operator_server/
handler.rs

1//! Action dispatch — parses a client action envelope and produces the matching
2//! server events.
3//!
4//! Each handler is `async` and forwards events through an
5//! `UnboundedSender<serde_json::Value>` (the per-connection outbound sink). The
6//! socket task drains the sink and writes each value as a JSON WS text frame, so
7//! streaming actions (`send_message`) can emit many events while still being
8//! driven from one place.
9
10use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::domain::{
17    Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
18};
19
20use crate::protocol;
21use crate::runner;
22use crate::runner::TurnRequest;
23use crate::state::AppState;
24
25/// The agent's display name for the reference server.
26const AGENT_NAME: &str = "smooth-agent";
27
28/// Parse and dispatch a single inbound text frame. Any produced events are sent
29/// through `sink`. Returns `Ok(())` always — protocol-level failures are
30/// surfaced as `error` events, never as hard errors that drop the connection.
31pub async fn handle_frame(
32    state: &AppState,
33    access: &AccessContext,
34    conn_id: &str,
35    origin: Option<&str>,
36    auth_org: Option<&str>,
37    raw: &str,
38    sink: &UnboundedSender<Value>,
39) {
40    let parsed: Value = match serde_json::from_str(raw) {
41        Ok(v) => v,
42        Err(e) => {
43            let _ = sink.send(protocol::error(
44                None,
45                "VALIDATION_ERROR",
46                &format!("invalid JSON frame: {e}"),
47            ));
48            return;
49        }
50    };
51
52    let action = parsed.get("action").and_then(Value::as_str);
53    let request_id = parsed.get("requestId").and_then(Value::as_str);
54
55    match action {
56        Some("ping") => {
57            let _ = sink.send(protocol::pong(request_id));
58        }
59        Some("create_conversation_session") => {
60            handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
61                .await;
62        }
63        Some("get_session") => {
64            handle_get_session(state, &parsed, request_id, sink);
65        }
66        Some("get_conversation_messages") => {
67            handle_get_conversation_messages(state, &parsed, request_id, sink).await;
68        }
69        Some("send_message") => {
70            handle_send_message(state, access, &parsed, request_id, sink).await;
71        }
72        Some("confirm_tool_action") => {
73            handle_confirm_tool_action(state, &parsed, request_id, sink);
74        }
75        Some(other) => {
76            let _ = sink.send(protocol::error(
77                request_id,
78                "UNSUPPORTED_ACTION",
79                &format!("action '{other}' is not supported by this server"),
80            ));
81        }
82        None => {
83            let _ = sink.send(protocol::error(
84                request_id,
85                "VALIDATION_ERROR",
86                "missing 'action' field",
87            ));
88        }
89    }
90}
91
92/// Outcome of widget-auth enforcement: whether to proceed, and (when an agent
93/// policy resolved) the org that policy attributes the agent to.
94enum WidgetAuthOutcome {
95    /// Auth denied — an `error` event was already emitted; the caller must stop.
96    Denied,
97    /// Auth passed. `org_id` is `Some` when the resolved policy carried an
98    /// `organization_id` (a multi-tenant host that knows the agent's org), else
99    /// `None` (no policy, or a policy without an org — org derivation falls
100    /// through to the JWT principal, then the seed org).
101    Allowed { org_id: Option<String> },
102}
103
104/// Enforce an agent's embeddable-widget policy (origin allowlist + `authContext`)
105/// before a session is created. Returns [`WidgetAuthOutcome::Allowed`] to proceed
106/// (carrying the policy's org when known), or [`WidgetAuthOutcome::Denied`] after
107/// emitting a protocol `error` (the caller must then stop). Agents with no policy
108/// proceed — unless `WIDGET_AUTH_STRICT` is set, in which case an unknown agent is
109/// rejected (fail closed).
110async fn enforce_widget_auth(
111    state: &AppState,
112    origin: Option<&str>,
113    agent_id: &str,
114    parsed: &Value,
115    request_id: Option<&str>,
116    sink: &UnboundedSender<Value>,
117) -> WidgetAuthOutcome {
118    let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
119        if state.config.widget_auth_strict {
120            let _ = sink.send(protocol::error(
121                request_id,
122                "AGENT_NOT_AUTHORIZED",
123                "this agent is not registered for embedding",
124            ));
125            return WidgetAuthOutcome::Denied;
126        }
127        return WidgetAuthOutcome::Allowed { org_id: None };
128    };
129
130    // Origin allowlist — fail closed: a missing or disallowed `Origin` is rejected.
131    if !smooth_operator::widget_auth::origin_allowed(
132        &policy.allowed_origins,
133        origin.unwrap_or_default(),
134    ) {
135        let _ = sink.send(protocol::error(
136            request_id,
137            "ORIGIN_NOT_ALLOWED",
138            "this origin is not allowed to embed this agent",
139        ));
140        return WidgetAuthOutcome::Denied;
141    }
142
143    // Pre-auth `authContext` (optional): when present it must verify.
144    if let Some(ac) = parsed.get("authContext") {
145        if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
146            let _ = sink.send(protocol::error(
147                request_id,
148                "AUTH_CONTEXT_INVALID",
149                "authContext signature failed verification",
150            ));
151            return WidgetAuthOutcome::Denied;
152        }
153    }
154    WidgetAuthOutcome::Allowed {
155        org_id: policy.organization_id,
156    }
157}
158
159/// Verify a JSON `authContext` (`{userId, signature, timestamp}`) against the
160/// agent's `public_key`. False on any missing field/key or signature/replay
161/// failure. Replay window: 60s.
162fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
163    let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
164        public_key,
165        ac.get("userId").and_then(Value::as_str),
166        ac.get("signature").and_then(Value::as_str),
167        ac.get("timestamp").and_then(Value::as_i64),
168    ) else {
169        return false;
170    };
171    let now = chrono::Utc::now().timestamp();
172    smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
173}
174
175/// `create_conversation_session` — create a conversation + user & agent
176/// participants + a session, then reply with an `immediate_response` carrying
177/// the session descriptor (per `create-conversation-session.schema.json`).
178async fn handle_create_session(
179    state: &AppState,
180    conn_id: &str,
181    origin: Option<&str>,
182    auth_org: Option<&str>,
183    parsed: &Value,
184    request_id: Option<&str>,
185    sink: &UnboundedSender<Value>,
186) {
187    let agent_id = parsed
188        .get("agentId")
189        .and_then(Value::as_str)
190        .map(str::to_string)
191        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
192
193    // Embeddable-widget auth: enforce the agent's origin allowlist + authContext
194    // before creating any session. No-op for agents without a policy (unless
195    // WIDGET_AUTH_STRICT). On denial, an error is emitted and we stop here. A
196    // resolved policy may also carry the agent's org (multi-tenant host).
197    let widget_org =
198        match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
199            WidgetAuthOutcome::Denied => return,
200            WidgetAuthOutcome::Allowed { org_id } => org_id,
201        };
202
203    let user_name = parsed
204        .get("userName")
205        .and_then(Value::as_str)
206        .unwrap_or("Visitor")
207        .to_string();
208    let user_email = parsed
209        .get("userEmail")
210        .and_then(Value::as_str)
211        .map(str::to_string);
212    let browser_fingerprint = parsed
213        .get("browserFingerprint")
214        .and_then(Value::as_str)
215        .map(str::to_string);
216
217    let now = chrono::Utc::now();
218    // Derive the org this session (conversation + participants) belongs to, in
219    // priority order:
220    //   1. the widget policy's `organization_id` — a multi-tenant host that knows
221    //      the agent's org (widget visitors authenticate via origin/authContext,
222    //      not a JWT, so their org rides on the agent's policy);
223    //   2. the connection's authenticated JWT principal org (`auth_org`) — a
224    //      dashboard user / authed client;
225    //   3. the server's seed org — the single-org reference/dev case, so the
226    //      admin API's org-scoping (document sets, indexing runs) still lines up
227    //      with the seeded knowledge. This keeps the no-auth/local flavor
228    //      behavior unchanged.
229    let org_id = widget_org
230        .or_else(|| auth_org.map(str::to_string))
231        .unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
232
233    let conversation_id = uuid::Uuid::new_v4().to_string();
234    let session_id = uuid::Uuid::new_v4().to_string();
235    let user_participant_id = uuid::Uuid::new_v4().to_string();
236    let agent_participant_id = uuid::Uuid::new_v4().to_string();
237
238    // Associate this connection with its session (and agent) on the backplane so
239    // events published to the session/agent — by an agent turn or any other
240    // service — reach this client's socket, on this pod or (with a Redis/NATS
241    // backplane) any pod.
242    state
243        .backplane
244        .associate(
245            conn_id,
246            smooth_operator::backplane::Target::Session(session_id.clone()),
247        )
248        .await;
249    state
250        .backplane
251        .associate(
252            conn_id,
253            smooth_operator::backplane::Target::Agent(agent_id.clone()),
254        )
255        .await;
256
257    let conversation = Conversation {
258        id: conversation_id.clone(),
259        platform: Platform::Web,
260        name: format!("Session {session_id}"),
261        organization_id: org_id.clone(),
262        idempotency_key: session_id.clone(),
263        metadata_json: parsed.get("metadata").cloned(),
264        analytics_json: None,
265        created_at: now,
266        updated_at: now,
267    };
268
269    let user_participant = Participant {
270        id: user_participant_id.clone(),
271        conversation_id: conversation_id.clone(),
272        organization_id: org_id.clone(),
273        participant_type: ParticipantType::User,
274        external_id: None,
275        internal_id: None,
276        browser_fingerprint,
277        browser_info: None,
278        name: user_name,
279        email: user_email,
280        phone: None,
281        crm_contact_id: None,
282        metadata_json: None,
283        created_at: now,
284        updated_at: now,
285    };
286
287    let agent_participant = Participant {
288        id: agent_participant_id.clone(),
289        conversation_id: conversation_id.clone(),
290        organization_id: org_id.clone(),
291        participant_type: ParticipantType::AiAgent,
292        external_id: None,
293        internal_id: Some(agent_id.clone()),
294        browser_fingerprint: None,
295        browser_info: None,
296        name: AGENT_NAME.to_string(),
297        email: None,
298        phone: None,
299        crm_contact_id: None,
300        metadata_json: None,
301        created_at: now,
302        updated_at: now,
303    };
304
305    let session = Session {
306        session_id: session_id.clone(),
307        conversation_id: conversation_id.clone(),
308        organization_id: org_id.clone(),
309        agent_id: agent_id.clone(),
310        agent_name: AGENT_NAME.to_string(),
311        user_participant_id: user_participant_id.clone(),
312        agent_participant_id: agent_participant_id.clone(),
313        // The thread id is the conversation id: per-session memory is carried by
314        // replaying this conversation's persisted message log (see runner.rs).
315        thread_id: conversation_id.clone(),
316        status: Some(SessionStatus::Active),
317        token_count: Some(0),
318        message_count: Some(0),
319        metadata: None,
320        created_at: Some(now),
321        updated_at: Some(now),
322        ended_at: None,
323        last_activity_at: Some(now),
324    };
325
326    // Persist to the storage adapter (best-effort: a failure surfaces as error).
327    let storage = state.storage.clone();
328    let sink_clone = sink.clone();
329    let request_id_owned = request_id.map(str::to_string);
330    let session_for_registry = session.clone();
331    let state_clone = state.clone();
332
333    let data = json!({
334        "sessionId": session_id,
335        "conversationId": conversation_id,
336        "agentId": agent_id,
337        "agentName": AGENT_NAME,
338        "userParticipantId": user_participant_id,
339        "agentParticipantId": agent_participant_id,
340    });
341
342    tokio::spawn(async move {
343        let rid = request_id_owned.as_deref();
344        if let Err(e) = storage.create_conversation(conversation).await {
345            let _ = sink_clone.send(protocol::error(
346                rid,
347                "INTERNAL_ERROR",
348                &format!("create conversation failed: {e}"),
349            ));
350            return;
351        }
352        if let Err(e) = storage.add_participant(user_participant).await {
353            let _ = sink_clone.send(protocol::error(
354                rid,
355                "INTERNAL_ERROR",
356                &format!("add user participant failed: {e}"),
357            ));
358            return;
359        }
360        if let Err(e) = storage.add_participant(agent_participant).await {
361            let _ = sink_clone.send(protocol::error(
362                rid,
363                "INTERNAL_ERROR",
364                &format!("add agent participant failed: {e}"),
365            ));
366            return;
367        }
368        if let Err(e) = storage.create_session(session).await {
369            let _ = sink_clone.send(protocol::error(
370                rid,
371                "INTERNAL_ERROR",
372                &format!("create session failed: {e}"),
373            ));
374            return;
375        }
376        state_clone.insert_session(session_for_registry);
377        let _ = sink_clone.send(protocol::immediate_response(
378            rid,
379            200,
380            "Session created",
381            data,
382        ));
383    });
384}
385
386/// `get_session` — return the session snapshot (per `get-session.schema.json`).
387fn handle_get_session(
388    state: &AppState,
389    parsed: &Value,
390    request_id: Option<&str>,
391    sink: &UnboundedSender<Value>,
392) {
393    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
394        let _ = sink.send(protocol::error(
395            request_id,
396            "VALIDATION_ERROR",
397            "missing 'sessionId'",
398        ));
399        return;
400    };
401
402    match state.get_session(session_id) {
403        Some(s) => {
404            let data = json!({
405                "sessionId": s.session_id,
406                "conversationId": s.conversation_id,
407                "agentId": s.agent_id,
408                "agentName": s.agent_name,
409                "userParticipantId": s.user_participant_id,
410                "agentParticipantId": s.agent_participant_id,
411                "threadId": s.thread_id,
412                "status": s.status.map_or("active", |st| match st {
413                    SessionStatus::Active => "active",
414                    SessionStatus::Idle => "idle",
415                    SessionStatus::Ended => "ended",
416                }),
417            });
418            let _ = sink.send(protocol::immediate_response(
419                request_id, 200, "Session", data,
420            ));
421        }
422        None => {
423            let _ = sink.send(protocol::error(
424                request_id,
425                "SESSION_NOT_FOUND",
426                &format!("session '{session_id}' not found"),
427            ));
428        }
429    }
430}
431
432/// `get_conversation_messages` — paginated message history for a session's
433/// conversation. Wraps the storage adapter's `list_messages_by_conversation`
434/// (the same call the admin API + the turn runner use) and replies with an
435/// `immediate_response` carrying `{ conversationId, messages, nextCursor, hasMore }`.
436///
437/// Optional inputs: `limit` (default 50) and an opaque `cursor` from a prior
438/// page's `nextCursor`. Newest-first (the common "recent history" read).
439async fn handle_get_conversation_messages(
440    state: &AppState,
441    parsed: &Value,
442    request_id: Option<&str>,
443    sink: &UnboundedSender<Value>,
444) {
445    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
446        let _ = sink.send(protocol::error(
447            request_id,
448            "VALIDATION_ERROR",
449            "missing 'sessionId'",
450        ));
451        return;
452    };
453    let Some(session) = state.get_session(session_id) else {
454        let _ = sink.send(protocol::error(
455            request_id,
456            "SESSION_NOT_FOUND",
457            &format!("session '{session_id}' not found"),
458        ));
459        return;
460    };
461
462    const DEFAULT_LIMIT: usize = 50;
463    let limit = parsed
464        .get("limit")
465        .and_then(Value::as_u64)
466        .map(|n| n as usize)
467        .filter(|n| *n > 0)
468        .unwrap_or(DEFAULT_LIMIT);
469    let cursor = parsed
470        .get("cursor")
471        .and_then(Value::as_str)
472        .map(str::to_string);
473
474    let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
475    query.cursor = cursor;
476    query.descending = true;
477
478    match state.storage.list_messages_by_conversation(query).await {
479        Ok(page) => {
480            let data = json!({
481                "conversationId": session.conversation_id,
482                "messages": page.messages,
483                "nextCursor": page.next_cursor,
484                "hasMore": page.next_cursor.is_some(),
485            });
486            let _ = sink.send(protocol::immediate_response(
487                request_id,
488                200,
489                "ConversationMessages",
490                data,
491            ));
492        }
493        Err(e) => {
494            let _ = sink.send(protocol::error(
495                request_id,
496                "STORAGE_ERROR",
497                &format!("failed to list messages: {e}"),
498            ));
499        }
500    }
501}
502
503/// `send_message` — ack with `immediate_response` (202), run a streaming
504/// knowledge-grounded turn, emit `stream_token` / `stream_chunk` as it goes, and
505/// finish with `eventual_response` (200). Errors (no gateway key, unknown
506/// session, agent failure) surface as clean `error` events.
507async fn handle_send_message(
508    state: &AppState,
509    access: &AccessContext,
510    parsed: &Value,
511    request_id: Option<&str>,
512    sink: &UnboundedSender<Value>,
513) {
514    // requestId is load-bearing for streaming correlation; require it.
515    let Some(request_id) = request_id else {
516        let _ = sink.send(protocol::error(
517            None,
518            "VALIDATION_ERROR",
519            "send_message requires a 'requestId'",
520        ));
521        return;
522    };
523
524    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
525        let _ = sink.send(protocol::error(
526            Some(request_id),
527            "VALIDATION_ERROR",
528            "missing 'sessionId'",
529        ));
530        return;
531    };
532
533    let message = match parsed.get("message").and_then(Value::as_str) {
534        Some(m) if !m.trim().is_empty() => m.to_string(),
535        _ => {
536            let _ = sink.send(protocol::error(
537                Some(request_id),
538                "VALIDATION_ERROR",
539                "missing or empty 'message'",
540            ));
541            return;
542        }
543    };
544
545    let Some(session) = state.get_session(session_id) else {
546        let _ = sink.send(protocol::error(
547            Some(request_id),
548            "SESSION_NOT_FOUND",
549            &format!("session '{session_id}' not found"),
550        ));
551        return;
552    };
553
554    // A test-injected provider (the scenario-parity corpus's `MockLlmClient`)
555    // overrides the live gateway client entirely — the turn never touches the
556    // network — so it does NOT need a configured gateway key. Production leaves
557    // `chat_provider` `None`, so this clone is `None` and the key gate below is
558    // unchanged.
559    let chat_provider = state.chat_provider.clone();
560
561    // Resolve the gateway key for this turn's org. The conversation carries the
562    // org; the resolver maps it to a per-org key (e.g. a LiteLLM virtual key per
563    // tenant) so a multi-tenant flavor bills/scopes each org separately. The
564    // default `EnvGatewayKeyResolver` returns the single env key for every org,
565    // so the local/default flavor is unchanged. On `None` (no per-org key) we
566    // fall back to the env key; only when neither supplies a key do we error.
567    let org_id = match state
568        .storage
569        .get_conversation(&session.conversation_id)
570        .await
571    {
572        Ok(Some(conversation)) => conversation.organization_id,
573        // No conversation row (shouldn't happen for a live session) → resolve as
574        // if anonymous; the env fallback still applies.
575        Ok(None) | Err(_) => String::new(),
576    };
577    let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
578        &state.gateway_key_resolver,
579        &org_id,
580        state.config.gateway_key.as_deref(),
581    )
582    .await;
583
584    // No resolvable key → can't run a *live* LLM turn. Return a clean error (the
585    // server stays usable for protocol-only checks). When a mock provider is
586    // injected we fall back to a placeholder config — the mock replaces the
587    // client built from it, so its url/key/model are never used.
588    // Keep a copy of the resolved key to thread into the turn's
589    // `ToolProviderContext` (a retrieval-style host tool calls the same gateway);
590    // `None` on the mock/placeholder path so a host tool can fall back.
591    let turn_gateway_key = resolved_key.clone();
592    let llm = match resolved_key {
593        Some(key) => state.config.llm_config_with_key(key),
594        None if chat_provider.is_some() => state.config.placeholder_llm_config(),
595        None => {
596            let _ = sink.send(protocol::error(
597                Some(request_id),
598                "LLM_UNAVAILABLE",
599                "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
600                 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
601                 key to enable send_message.",
602            ));
603            return;
604        }
605    };
606
607    // Ack: processing started.
608    let _ = sink.send(protocol::immediate_response(
609        Some(request_id),
610        202,
611        "Processing your request...",
612        json!({}),
613    ));
614
615    // Run the turn in a spawned task, NOT inline. A turn that calls a
616    // confirmation-gated write tool **parks** awaiting a later
617    // `confirm_tool_action` frame; the socket reader dispatches that frame on the
618    // same connection, so blocking the reader here would deadlock (the confirm
619    // can never be read). Spawning frees the reader to receive the confirmation
620    // while the turn streams its events through the (cloned) sink. Pearl: HITL
621    // pause/resume.
622    let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
623        crate::runner::ConfirmationConfig {
624            tool_patterns: patterns,
625            session_id: session.session_id.clone(),
626            register: {
627                let state = state.clone();
628                Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
629            },
630            clear: {
631                let state = state.clone();
632                Arc::new(move |sid: &str| state.clear_confirmation(sid))
633            },
634        }
635    });
636
637    // The reference server is single-org; a multi-tenant host derives this from
638    // auth. Used to (a) resolve the org's persona override (SEAM 2) and (b)
639    // scope the host's tool provider (SEAM 1).
640    let org_id = crate::server::SEED_ORG_ID.to_string();
641    // SEAM 2 — resolve the per-org persona. With the default in-memory settings
642    // store the override is `None`, so the runner stays on its const prompt and
643    // behavior is unchanged.
644    let system_prompt = state.settings.get(&org_id).persona;
645    // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
646    let tool_provider = state.tool_provider.clone();
647
648    let state_for_turn = state.clone();
649    // Carry the turn's org on the AccessContext so a multi-tenant host adapter's
650    // `knowledge_for_access` can scope RAG to this tenant. The authed-principal
651    // path already stamps its own org (`Principal::access_context`); a widget /
652    // anonymous connection does not, so fall back to the session's persisted org
653    // (every session carries `organization_id` since the create-session path
654    // derives it). The operator's built-in single-tenant ACL ignores the org, so
655    // this is behavior-preserving for the reference flavor.
656    let access_owned = if access.organization_id.is_some() {
657        access.clone()
658    } else {
659        access
660            .clone()
661            .with_organization_id(session.organization_id.clone())
662    };
663    let sink_owned = sink.clone();
664    let request_id_owned = request_id.to_string();
665    let conversation_id = session.conversation_id.clone();
666
667    tokio::spawn(async move {
668        let result = runner::run_streaming_turn(
669            TurnRequest {
670                storage: state_for_turn.storage.clone(),
671                llm,
672                max_iterations: state_for_turn.config.max_iterations,
673                conversation_id: &conversation_id,
674                request_id: &request_id_owned,
675                user_message: &message,
676                // The connection's resolved document-level entitlement: retrieval is
677                // filtered to what this requester may read (org-public only when the
678                // connection is anonymous).
679                access: access_owned,
680                // Production: `None` (a live client is built from `llm`). Tests: the
681                // scenario corpus's `MockLlmClient`, which runs the turn offline.
682                llm_provider: chat_provider,
683                // Opt-in rerank stage (feature gap G8): `None` unless the operator
684                // enabled it via `SMOOTH_AGENT_RERANK` (gateway/lexical). Default-off
685                // keeps retrieval behavior unchanged.
686                reranker: crate::reranker::build_reranker(
687                    &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
688                ),
689                confirmation,
690                // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
691                tool_provider,
692                // SEAM 2 — resolved per-org persona (None ⇒ const prompt).
693                system_prompt,
694                org_id: Some(org_id),
695                // The per-org key resolved above, threaded so a host tool
696                // provider's retrieval tools call the same gateway this turn used.
697                gateway_key: turn_gateway_key,
698            },
699            &sink_owned,
700        )
701        .await;
702
703        match result {
704            Ok(turn) => {
705                let response = runner::general_agent_response(&turn.reply);
706                let _ = sink_owned.send(protocol::eventual_response(
707                    &request_id_owned,
708                    200,
709                    &turn.message_id,
710                    response,
711                    false,
712                    &turn.citations,
713                ));
714            }
715            Err(e) => {
716                let _ = sink_owned.send(protocol::error(
717                    Some(&request_id_owned),
718                    "AGENT_ERROR",
719                    &format!("agent turn failed: {e}"),
720                ));
721            }
722        }
723    });
724}
725
726/// `confirm_tool_action` — resume a turn parked on a write-tool confirmation.
727///
728/// Per `spec/actions/confirm-tool-action.schema.json` the client sends
729/// `{ action, sessionId, requestId, approved }` in reply to a
730/// `write_confirmation_required` event. We look up the session's registered
731/// [`HumanResponse`](smooth_operator_core::HumanResponse) sender (set by the
732/// runner's confirmation bridge when the turn parked), take it, and feed it the
733/// verdict: `approved` → `Approved` (the parked tool executes), else `Denied`
734/// (the tool is skipped with a rejection result the model sees). There is no
735/// dedicated response event — the resumed workflow signals continuation via its
736/// normal streaming sequence (`stream_chunk`/`stream_token` → `eventual_response`);
737/// we additionally ack with an `immediate_response`. Taking the sender makes a
738/// duplicate confirm a no-op (`NO_PENDING_CONFIRMATION`).
739fn handle_confirm_tool_action(
740    state: &AppState,
741    parsed: &Value,
742    request_id: Option<&str>,
743    sink: &UnboundedSender<Value>,
744) {
745    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
746        let _ = sink.send(protocol::error(
747            request_id,
748            "VALIDATION_ERROR",
749            "confirm_tool_action requires a 'sessionId'",
750        ));
751        return;
752    };
753
754    // `approved` is required and must be a boolean — a missing/garbled verdict
755    // must NOT silently approve a write. Fail closed on a bad shape.
756    let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
757        let _ = sink.send(protocol::error(
758            request_id,
759            "VALIDATION_ERROR",
760            "confirm_tool_action requires a boolean 'approved'",
761        ));
762        return;
763    };
764
765    let Some(responder) = state.take_confirmation(session_id) else {
766        let _ = sink.send(protocol::error(
767            request_id,
768            "NO_PENDING_CONFIRMATION",
769            &format!("no tool action is awaiting confirmation for session '{session_id}'"),
770        ));
771        return;
772    };
773
774    let verdict = if approved {
775        smooth_operator_core::HumanResponse::Approved
776    } else {
777        smooth_operator_core::HumanResponse::Denied {
778            reason: "user rejected the action".to_string(),
779        }
780    };
781
782    if responder.send(verdict).is_err() {
783        // The parked turn ended (timeout / disconnect) before the confirm landed.
784        let _ = sink.send(protocol::error(
785            request_id,
786            "NO_PENDING_CONFIRMATION",
787            &format!(
788                "the turn awaiting confirmation for session '{session_id}' is no longer active"
789            ),
790        ));
791        return;
792    }
793
794    // Ack the confirmation; the resumed turn streams its own follow-on events.
795    let _ = sink.send(protocol::immediate_response(
796        request_id,
797        200,
798        if approved {
799            "Tool action approved"
800        } else {
801            "Tool action rejected"
802        },
803        json!({ "sessionId": session_id, "approved": approved }),
804    ));
805}