Skip to main content

smooth_operator_server/
handler.rs

1//! Action dispatch — parses a client action envelope and produces the matching
2//! server events.
3//!
4//! Each handler is `async` and forwards events through an
5//! `UnboundedSender<serde_json::Value>` (the per-connection outbound sink). The
6//! socket task drains the sink and writes each value as a JSON WS text frame, so
7//! streaming actions (`send_message`) can emit many events while still being
8//! driven from one place.
9
10use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::domain::{
17    Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
18};
19
20use crate::protocol;
21use crate::runner;
22use crate::runner::TurnRequest;
23use crate::state::AppState;
24
25/// The agent's display name for the reference server.
26const AGENT_NAME: &str = "smooth-agent";
27
28/// Parse and dispatch a single inbound text frame. Any produced events are sent
29/// through `sink`. Returns `Ok(())` always — protocol-level failures are
30/// surfaced as `error` events, never as hard errors that drop the connection.
31pub async fn handle_frame(
32    state: &AppState,
33    access: &AccessContext,
34    conn_id: &str,
35    origin: Option<&str>,
36    auth_org: Option<&str>,
37    raw: &str,
38    sink: &UnboundedSender<Value>,
39) {
40    let parsed: Value = match serde_json::from_str(raw) {
41        Ok(v) => v,
42        Err(e) => {
43            let _ = sink.send(protocol::error(
44                None,
45                "VALIDATION_ERROR",
46                &format!("invalid JSON frame: {e}"),
47            ));
48            return;
49        }
50    };
51
52    let action = parsed.get("action").and_then(Value::as_str);
53    let request_id = parsed.get("requestId").and_then(Value::as_str);
54
55    match action {
56        Some("ping") => {
57            let _ = sink.send(protocol::pong(request_id));
58        }
59        Some("create_conversation_session") => {
60            handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
61                .await;
62        }
63        Some("get_session") => {
64            handle_get_session(state, &parsed, request_id, sink);
65        }
66        Some("get_conversation_messages") => {
67            handle_get_conversation_messages(state, &parsed, request_id, sink).await;
68        }
69        Some("send_message") => {
70            handle_send_message(state, access, &parsed, request_id, sink).await;
71        }
72        Some("confirm_tool_action") => {
73            handle_confirm_tool_action(state, &parsed, request_id, sink);
74        }
75        Some(other) => {
76            let _ = sink.send(protocol::error(
77                request_id,
78                "UNSUPPORTED_ACTION",
79                &format!("action '{other}' is not supported by this server"),
80            ));
81        }
82        None => {
83            let _ = sink.send(protocol::error(
84                request_id,
85                "VALIDATION_ERROR",
86                "missing 'action' field",
87            ));
88        }
89    }
90}
91
92/// Outcome of widget-auth enforcement: whether to proceed, and (when an agent
93/// policy resolved) the org that policy attributes the agent to.
94enum WidgetAuthOutcome {
95    /// Auth denied — an `error` event was already emitted; the caller must stop.
96    Denied,
97    /// Auth passed. `org_id` is `Some` when the resolved policy carried an
98    /// `organization_id` (a multi-tenant host that knows the agent's org), else
99    /// `None` (no policy, or a policy without an org — org derivation falls
100    /// through to the JWT principal, then the seed org).
101    Allowed { org_id: Option<String> },
102}
103
104/// Enforce an agent's embeddable-widget policy (origin allowlist + `authContext`)
105/// before a session is created. Returns [`WidgetAuthOutcome::Allowed`] to proceed
106/// (carrying the policy's org when known), or [`WidgetAuthOutcome::Denied`] after
107/// emitting a protocol `error` (the caller must then stop). Agents with no policy
108/// proceed — unless `WIDGET_AUTH_STRICT` is set, in which case an unknown agent is
109/// rejected (fail closed).
110async fn enforce_widget_auth(
111    state: &AppState,
112    origin: Option<&str>,
113    agent_id: &str,
114    parsed: &Value,
115    request_id: Option<&str>,
116    sink: &UnboundedSender<Value>,
117) -> WidgetAuthOutcome {
118    let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
119        if state.config.widget_auth_strict {
120            let _ = sink.send(protocol::error(
121                request_id,
122                "AGENT_NOT_AUTHORIZED",
123                "this agent is not registered for embedding",
124            ));
125            return WidgetAuthOutcome::Denied;
126        }
127        return WidgetAuthOutcome::Allowed { org_id: None };
128    };
129
130    // Origin allowlist — fail closed: a missing or disallowed `Origin` is rejected.
131    if !smooth_operator::widget_auth::origin_allowed(
132        &policy.allowed_origins,
133        origin.unwrap_or_default(),
134    ) {
135        let _ = sink.send(protocol::error(
136            request_id,
137            "ORIGIN_NOT_ALLOWED",
138            "this origin is not allowed to embed this agent",
139        ));
140        return WidgetAuthOutcome::Denied;
141    }
142
143    // Pre-auth `authContext` (optional): when present it must verify.
144    if let Some(ac) = parsed.get("authContext") {
145        if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
146            let _ = sink.send(protocol::error(
147                request_id,
148                "AUTH_CONTEXT_INVALID",
149                "authContext signature failed verification",
150            ));
151            return WidgetAuthOutcome::Denied;
152        }
153    }
154    WidgetAuthOutcome::Allowed {
155        org_id: policy.organization_id,
156    }
157}
158
159/// Verify a JSON `authContext` (`{userId, signature, timestamp}`) against the
160/// agent's `public_key`. False on any missing field/key or signature/replay
161/// failure. Replay window: 60s.
162fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
163    let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
164        public_key,
165        ac.get("userId").and_then(Value::as_str),
166        ac.get("signature").and_then(Value::as_str),
167        ac.get("timestamp").and_then(Value::as_i64),
168    ) else {
169        return false;
170    };
171    let now = chrono::Utc::now().timestamp();
172    smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
173}
174
175/// `create_conversation_session` — create a conversation + user & agent
176/// participants + a session, then reply with an `immediate_response` carrying
177/// the session descriptor (per `create-conversation-session.schema.json`).
178async fn handle_create_session(
179    state: &AppState,
180    conn_id: &str,
181    origin: Option<&str>,
182    auth_org: Option<&str>,
183    parsed: &Value,
184    request_id: Option<&str>,
185    sink: &UnboundedSender<Value>,
186) {
187    let agent_id = parsed
188        .get("agentId")
189        .and_then(Value::as_str)
190        .map(str::to_string)
191        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
192
193    // Embeddable-widget auth: enforce the agent's origin allowlist + authContext
194    // before creating any session. No-op for agents without a policy (unless
195    // WIDGET_AUTH_STRICT). On denial, an error is emitted and we stop here. A
196    // resolved policy may also carry the agent's org (multi-tenant host).
197    let widget_org =
198        match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
199            WidgetAuthOutcome::Denied => return,
200            WidgetAuthOutcome::Allowed { org_id } => org_id,
201        };
202
203    let user_name = parsed
204        .get("userName")
205        .and_then(Value::as_str)
206        .unwrap_or("Visitor")
207        .to_string();
208    let user_email = parsed
209        .get("userEmail")
210        .and_then(Value::as_str)
211        .map(str::to_string);
212    let browser_fingerprint = parsed
213        .get("browserFingerprint")
214        .and_then(Value::as_str)
215        .map(str::to_string);
216
217    let now = chrono::Utc::now();
218    // Derive the org this session (conversation + participants) belongs to, in
219    // priority order:
220    //   1. the widget policy's `organization_id` — a multi-tenant host that knows
221    //      the agent's org (widget visitors authenticate via origin/authContext,
222    //      not a JWT, so their org rides on the agent's policy);
223    //   2. the connection's authenticated JWT principal org (`auth_org`) — a
224    //      dashboard user / authed client;
225    //   3. the server's seed org — the single-org reference/dev case, so the
226    //      admin API's org-scoping (document sets, indexing runs) still lines up
227    //      with the seeded knowledge. This keeps the no-auth/local flavor
228    //      behavior unchanged.
229    let org_id = widget_org
230        .or_else(|| auth_org.map(str::to_string))
231        .unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
232
233    let conversation_id = uuid::Uuid::new_v4().to_string();
234    let session_id = uuid::Uuid::new_v4().to_string();
235    let user_participant_id = uuid::Uuid::new_v4().to_string();
236    let agent_participant_id = uuid::Uuid::new_v4().to_string();
237
238    // Associate this connection with its session (and agent) on the backplane so
239    // events published to the session/agent — by an agent turn or any other
240    // service — reach this client's socket, on this pod or (with a Redis/NATS
241    // backplane) any pod.
242    state
243        .backplane
244        .associate(
245            conn_id,
246            smooth_operator::backplane::Target::Session(session_id.clone()),
247        )
248        .await;
249    state
250        .backplane
251        .associate(
252            conn_id,
253            smooth_operator::backplane::Target::Agent(agent_id.clone()),
254        )
255        .await;
256
257    let conversation = Conversation {
258        id: conversation_id.clone(),
259        platform: Platform::Web,
260        name: format!("Session {session_id}"),
261        organization_id: org_id.clone(),
262        idempotency_key: session_id.clone(),
263        metadata_json: parsed.get("metadata").cloned(),
264        analytics_json: None,
265        created_at: now,
266        updated_at: now,
267    };
268
269    let user_participant = Participant {
270        id: user_participant_id.clone(),
271        conversation_id: conversation_id.clone(),
272        organization_id: org_id.clone(),
273        participant_type: ParticipantType::User,
274        external_id: None,
275        internal_id: None,
276        browser_fingerprint,
277        browser_info: None,
278        name: user_name,
279        email: user_email,
280        phone: None,
281        crm_contact_id: None,
282        metadata_json: None,
283        created_at: now,
284        updated_at: now,
285    };
286
287    let agent_participant = Participant {
288        id: agent_participant_id.clone(),
289        conversation_id: conversation_id.clone(),
290        organization_id: org_id.clone(),
291        participant_type: ParticipantType::AiAgent,
292        external_id: None,
293        internal_id: Some(agent_id.clone()),
294        browser_fingerprint: None,
295        browser_info: None,
296        name: AGENT_NAME.to_string(),
297        email: None,
298        phone: None,
299        crm_contact_id: None,
300        metadata_json: None,
301        created_at: now,
302        updated_at: now,
303    };
304
305    let session = Session {
306        session_id: session_id.clone(),
307        conversation_id: conversation_id.clone(),
308        organization_id: org_id.clone(),
309        agent_id: agent_id.clone(),
310        agent_name: AGENT_NAME.to_string(),
311        user_participant_id: user_participant_id.clone(),
312        agent_participant_id: agent_participant_id.clone(),
313        // The thread id is the conversation id: per-session memory is carried by
314        // replaying this conversation's persisted message log (see runner.rs).
315        thread_id: conversation_id.clone(),
316        status: Some(SessionStatus::Active),
317        token_count: Some(0),
318        message_count: Some(0),
319        metadata: None,
320        created_at: Some(now),
321        updated_at: Some(now),
322        ended_at: None,
323        last_activity_at: Some(now),
324    };
325
326    // Persist to the storage adapter (best-effort: a failure surfaces as error).
327    let storage = state.storage.clone();
328    let sink_clone = sink.clone();
329    let request_id_owned = request_id.map(str::to_string);
330    let session_for_registry = session.clone();
331    let state_clone = state.clone();
332
333    let data = json!({
334        "sessionId": session_id,
335        "conversationId": conversation_id,
336        "agentId": agent_id,
337        "agentName": AGENT_NAME,
338        "userParticipantId": user_participant_id,
339        "agentParticipantId": agent_participant_id,
340    });
341
342    tokio::spawn(async move {
343        let rid = request_id_owned.as_deref();
344        if let Err(e) = storage.create_conversation(conversation).await {
345            let _ = sink_clone.send(protocol::error(
346                rid,
347                "INTERNAL_ERROR",
348                &format!("create conversation failed: {e}"),
349            ));
350            return;
351        }
352        if let Err(e) = storage.add_participant(user_participant).await {
353            let _ = sink_clone.send(protocol::error(
354                rid,
355                "INTERNAL_ERROR",
356                &format!("add user participant failed: {e}"),
357            ));
358            return;
359        }
360        if let Err(e) = storage.add_participant(agent_participant).await {
361            let _ = sink_clone.send(protocol::error(
362                rid,
363                "INTERNAL_ERROR",
364                &format!("add agent participant failed: {e}"),
365            ));
366            return;
367        }
368        if let Err(e) = storage.create_session(session).await {
369            let _ = sink_clone.send(protocol::error(
370                rid,
371                "INTERNAL_ERROR",
372                &format!("create session failed: {e}"),
373            ));
374            return;
375        }
376        state_clone.insert_session(session_for_registry);
377        let _ = sink_clone.send(protocol::immediate_response(
378            rid,
379            200,
380            "Session created",
381            data,
382        ));
383    });
384}
385
386/// `get_session` — return the session snapshot (per `get-session.schema.json`).
387fn handle_get_session(
388    state: &AppState,
389    parsed: &Value,
390    request_id: Option<&str>,
391    sink: &UnboundedSender<Value>,
392) {
393    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
394        let _ = sink.send(protocol::error(
395            request_id,
396            "VALIDATION_ERROR",
397            "missing 'sessionId'",
398        ));
399        return;
400    };
401
402    match state.get_session(session_id) {
403        Some(s) => {
404            let data = json!({
405                "sessionId": s.session_id,
406                "conversationId": s.conversation_id,
407                "agentId": s.agent_id,
408                "agentName": s.agent_name,
409                "userParticipantId": s.user_participant_id,
410                "agentParticipantId": s.agent_participant_id,
411                "threadId": s.thread_id,
412                "status": s.status.map_or("active", |st| match st {
413                    SessionStatus::Active => "active",
414                    SessionStatus::Idle => "idle",
415                    SessionStatus::Ended => "ended",
416                }),
417            });
418            let _ = sink.send(protocol::immediate_response(
419                request_id, 200, "Session", data,
420            ));
421        }
422        None => {
423            let _ = sink.send(protocol::error(
424                request_id,
425                "SESSION_NOT_FOUND",
426                &format!("session '{session_id}' not found"),
427            ));
428        }
429    }
430}
431
432/// `get_conversation_messages` — paginated message history for a session's
433/// conversation. Wraps the storage adapter's `list_messages_by_conversation`
434/// (the same call the admin API + the turn runner use) and replies with an
435/// `immediate_response` carrying `{ conversationId, messages, nextCursor, hasMore }`.
436///
437/// Optional inputs: `limit` (default 50) and an opaque `cursor` from a prior
438/// page's `nextCursor`. Newest-first (the common "recent history" read).
439async fn handle_get_conversation_messages(
440    state: &AppState,
441    parsed: &Value,
442    request_id: Option<&str>,
443    sink: &UnboundedSender<Value>,
444) {
445    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
446        let _ = sink.send(protocol::error(
447            request_id,
448            "VALIDATION_ERROR",
449            "missing 'sessionId'",
450        ));
451        return;
452    };
453    let Some(session) = state.get_session(session_id) else {
454        let _ = sink.send(protocol::error(
455            request_id,
456            "SESSION_NOT_FOUND",
457            &format!("session '{session_id}' not found"),
458        ));
459        return;
460    };
461
462    const DEFAULT_LIMIT: usize = 50;
463    let limit = parsed
464        .get("limit")
465        .and_then(Value::as_u64)
466        .map(|n| n as usize)
467        .filter(|n| *n > 0)
468        .unwrap_or(DEFAULT_LIMIT);
469    let cursor = parsed
470        .get("cursor")
471        .and_then(Value::as_str)
472        .map(str::to_string);
473
474    let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
475    query.cursor = cursor;
476    query.descending = true;
477
478    match state.storage.list_messages_by_conversation(query).await {
479        Ok(page) => {
480            let data = json!({
481                "conversationId": session.conversation_id,
482                "messages": page.messages,
483                "nextCursor": page.next_cursor,
484                "hasMore": page.next_cursor.is_some(),
485            });
486            let _ = sink.send(protocol::immediate_response(
487                request_id,
488                200,
489                "ConversationMessages",
490                data,
491            ));
492        }
493        Err(e) => {
494            let _ = sink.send(protocol::error(
495                request_id,
496                "STORAGE_ERROR",
497                &format!("failed to list messages: {e}"),
498            ));
499        }
500    }
501}
502
503/// `send_message` — ack with `immediate_response` (202), run a streaming
504/// knowledge-grounded turn, emit `stream_token` / `stream_chunk` as it goes, and
505/// finish with `eventual_response` (200). Errors (no gateway key, unknown
506/// session, agent failure) surface as clean `error` events.
507async fn handle_send_message(
508    state: &AppState,
509    access: &AccessContext,
510    parsed: &Value,
511    request_id: Option<&str>,
512    sink: &UnboundedSender<Value>,
513) {
514    // requestId is load-bearing for streaming correlation; require it.
515    let Some(request_id) = request_id else {
516        let _ = sink.send(protocol::error(
517            None,
518            "VALIDATION_ERROR",
519            "send_message requires a 'requestId'",
520        ));
521        return;
522    };
523
524    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
525        let _ = sink.send(protocol::error(
526            Some(request_id),
527            "VALIDATION_ERROR",
528            "missing 'sessionId'",
529        ));
530        return;
531    };
532
533    let message = match parsed.get("message").and_then(Value::as_str) {
534        Some(m) if !m.trim().is_empty() => m.to_string(),
535        _ => {
536            let _ = sink.send(protocol::error(
537                Some(request_id),
538                "VALIDATION_ERROR",
539                "missing or empty 'message'",
540            ));
541            return;
542        }
543    };
544
545    let Some(session) = state.get_session(session_id) else {
546        let _ = sink.send(protocol::error(
547            Some(request_id),
548            "SESSION_NOT_FOUND",
549            &format!("session '{session_id}' not found"),
550        ));
551        return;
552    };
553
554    // A test-injected provider (the scenario-parity corpus's `MockLlmClient`)
555    // overrides the live gateway client entirely — the turn never touches the
556    // network — so it does NOT need a configured gateway key. Production leaves
557    // `chat_provider` `None`, so this clone is `None` and the key gate below is
558    // unchanged.
559    let chat_provider = state.chat_provider.clone();
560
561    // Resolve the gateway key for this turn's org. The conversation carries the
562    // org; the resolver maps it to a per-org key (e.g. a LiteLLM virtual key per
563    // tenant) so a multi-tenant flavor bills/scopes each org separately. The
564    // default `EnvGatewayKeyResolver` returns the single env key for every org,
565    // so the local/default flavor is unchanged. On `None` (no per-org key) we
566    // fall back to the env key; only when neither supplies a key do we error.
567    let org_id = match state
568        .storage
569        .get_conversation(&session.conversation_id)
570        .await
571    {
572        Ok(Some(conversation)) => conversation.organization_id,
573        // No conversation row (shouldn't happen for a live session) → resolve as
574        // if anonymous; the env fallback still applies.
575        Ok(None) | Err(_) => String::new(),
576    };
577    let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
578        &state.gateway_key_resolver,
579        &org_id,
580        state.config.gateway_key.as_deref(),
581    )
582    .await;
583
584    // No resolvable key → can't run a *live* LLM turn. Return a clean error (the
585    // server stays usable for protocol-only checks). When a mock provider is
586    // injected we fall back to a placeholder config — the mock replaces the
587    // client built from it, so its url/key/model are never used.
588    // Keep a copy of the resolved key to thread into the turn's
589    // `ToolProviderContext` (a retrieval-style host tool calls the same gateway);
590    // `None` on the mock/placeholder path so a host tool can fall back.
591    let turn_gateway_key = resolved_key.clone();
592    let llm = match resolved_key {
593        Some(key) => state.config.llm_config_with_key(key),
594        None if chat_provider.is_some() => state.config.placeholder_llm_config(),
595        None => {
596            let _ = sink.send(protocol::error(
597                Some(request_id),
598                "LLM_UNAVAILABLE",
599                "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
600                 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
601                 key to enable send_message.",
602            ));
603            return;
604        }
605    };
606
607    // Ack: processing started.
608    let _ = sink.send(protocol::immediate_response(
609        Some(request_id),
610        202,
611        "Processing your request...",
612        json!({}),
613    ));
614
615    // Run the turn in a spawned task, NOT inline. A turn that calls a
616    // confirmation-gated write tool **parks** awaiting a later
617    // `confirm_tool_action` frame; the socket reader dispatches that frame on the
618    // same connection, so blocking the reader here would deadlock (the confirm
619    // can never be read). Spawning frees the reader to receive the confirmation
620    // while the turn streams its events through the (cloned) sink. Pearl: HITL
621    // pause/resume.
622    let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
623        crate::runner::ConfirmationConfig {
624            tool_patterns: patterns,
625            session_id: session.session_id.clone(),
626            register: {
627                let state = state.clone();
628                Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
629            },
630            clear: {
631                let state = state.clone();
632                Arc::new(move |sid: &str| state.clear_confirmation(sid))
633            },
634        }
635    });
636
637    // The reference server is single-org; a multi-tenant host derives this from
638    // auth. Used to (a) resolve the org's persona override (SEAM 2) and (b)
639    // scope the host's tool provider (SEAM 1).
640    let org_id = crate::server::SEED_ORG_ID.to_string();
641    // SEAM 2 — resolve the per-org persona. With the default in-memory settings
642    // store the override is `None`, so the runner stays on its const prompt and
643    // behavior is unchanged.
644    let system_prompt = state.settings.get(&org_id).persona;
645    // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
646    let tool_provider = state.tool_provider.clone();
647
648    let state_for_turn = state.clone();
649    let access_owned = access.clone();
650    let sink_owned = sink.clone();
651    let request_id_owned = request_id.to_string();
652    let conversation_id = session.conversation_id.clone();
653
654    tokio::spawn(async move {
655        let result = runner::run_streaming_turn(
656            TurnRequest {
657                storage: state_for_turn.storage.clone(),
658                llm,
659                max_iterations: state_for_turn.config.max_iterations,
660                conversation_id: &conversation_id,
661                request_id: &request_id_owned,
662                user_message: &message,
663                // The connection's resolved document-level entitlement: retrieval is
664                // filtered to what this requester may read (org-public only when the
665                // connection is anonymous).
666                access: access_owned,
667                // Production: `None` (a live client is built from `llm`). Tests: the
668                // scenario corpus's `MockLlmClient`, which runs the turn offline.
669                llm_provider: chat_provider,
670                // Opt-in rerank stage (feature gap G8): `None` unless the operator
671                // enabled it via `SMOOTH_AGENT_RERANK` (gateway/lexical). Default-off
672                // keeps retrieval behavior unchanged.
673                reranker: crate::reranker::build_reranker(
674                    &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
675                ),
676                confirmation,
677                // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
678                tool_provider,
679                // SEAM 2 — resolved per-org persona (None ⇒ const prompt).
680                system_prompt,
681                org_id: Some(org_id),
682                // The per-org key resolved above, threaded so a host tool
683                // provider's retrieval tools call the same gateway this turn used.
684                gateway_key: turn_gateway_key,
685            },
686            &sink_owned,
687        )
688        .await;
689
690        match result {
691            Ok(turn) => {
692                let response = runner::general_agent_response(&turn.reply);
693                let _ = sink_owned.send(protocol::eventual_response(
694                    &request_id_owned,
695                    200,
696                    &turn.message_id,
697                    response,
698                    false,
699                    &turn.citations,
700                ));
701            }
702            Err(e) => {
703                let _ = sink_owned.send(protocol::error(
704                    Some(&request_id_owned),
705                    "AGENT_ERROR",
706                    &format!("agent turn failed: {e}"),
707                ));
708            }
709        }
710    });
711}
712
713/// `confirm_tool_action` — resume a turn parked on a write-tool confirmation.
714///
715/// Per `spec/actions/confirm-tool-action.schema.json` the client sends
716/// `{ action, sessionId, requestId, approved }` in reply to a
717/// `write_confirmation_required` event. We look up the session's registered
718/// [`HumanResponse`](smooth_operator_core::HumanResponse) sender (set by the
719/// runner's confirmation bridge when the turn parked), take it, and feed it the
720/// verdict: `approved` → `Approved` (the parked tool executes), else `Denied`
721/// (the tool is skipped with a rejection result the model sees). There is no
722/// dedicated response event — the resumed workflow signals continuation via its
723/// normal streaming sequence (`stream_chunk`/`stream_token` → `eventual_response`);
724/// we additionally ack with an `immediate_response`. Taking the sender makes a
725/// duplicate confirm a no-op (`NO_PENDING_CONFIRMATION`).
726fn handle_confirm_tool_action(
727    state: &AppState,
728    parsed: &Value,
729    request_id: Option<&str>,
730    sink: &UnboundedSender<Value>,
731) {
732    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
733        let _ = sink.send(protocol::error(
734            request_id,
735            "VALIDATION_ERROR",
736            "confirm_tool_action requires a 'sessionId'",
737        ));
738        return;
739    };
740
741    // `approved` is required and must be a boolean — a missing/garbled verdict
742    // must NOT silently approve a write. Fail closed on a bad shape.
743    let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
744        let _ = sink.send(protocol::error(
745            request_id,
746            "VALIDATION_ERROR",
747            "confirm_tool_action requires a boolean 'approved'",
748        ));
749        return;
750    };
751
752    let Some(responder) = state.take_confirmation(session_id) else {
753        let _ = sink.send(protocol::error(
754            request_id,
755            "NO_PENDING_CONFIRMATION",
756            &format!("no tool action is awaiting confirmation for session '{session_id}'"),
757        ));
758        return;
759    };
760
761    let verdict = if approved {
762        smooth_operator_core::HumanResponse::Approved
763    } else {
764        smooth_operator_core::HumanResponse::Denied {
765            reason: "user rejected the action".to_string(),
766        }
767    };
768
769    if responder.send(verdict).is_err() {
770        // The parked turn ended (timeout / disconnect) before the confirm landed.
771        let _ = sink.send(protocol::error(
772            request_id,
773            "NO_PENDING_CONFIRMATION",
774            &format!(
775                "the turn awaiting confirmation for session '{session_id}' is no longer active"
776            ),
777        ));
778        return;
779    }
780
781    // Ack the confirmation; the resumed turn streams its own follow-on events.
782    let _ = sink.send(protocol::immediate_response(
783        request_id,
784        200,
785        if approved {
786            "Tool action approved"
787        } else {
788            "Tool action rejected"
789        },
790        json!({ "sessionId": session_id, "approved": approved }),
791    ));
792}