Skip to main content

smooth_operator_server/
handler.rs

1//! Action dispatch — parses a client action envelope and produces the matching
2//! server events.
3//!
4//! Each handler is `async` and forwards events through an
5//! `UnboundedSender<serde_json::Value>` (the per-connection outbound sink). The
6//! socket task drains the sink and writes each value as a JSON WS text frame, so
7//! streaming actions (`send_message`) can emit many events while still being
8//! driven from one place.
9
10use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::domain::{
17    Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
18};
19
20use crate::protocol;
21use crate::runner;
22use crate::runner::TurnRequest;
23use crate::state::AppState;
24
25/// The agent's display name for the reference server.
26const AGENT_NAME: &str = "smooth-agent";
27
28/// Parse and dispatch a single inbound text frame. Any produced events are sent
29/// through `sink`. Returns `Ok(())` always — protocol-level failures are
30/// surfaced as `error` events, never as hard errors that drop the connection.
31pub async fn handle_frame(
32    state: &AppState,
33    access: &AccessContext,
34    conn_id: &str,
35    origin: Option<&str>,
36    raw: &str,
37    sink: &UnboundedSender<Value>,
38) {
39    let parsed: Value = match serde_json::from_str(raw) {
40        Ok(v) => v,
41        Err(e) => {
42            let _ = sink.send(protocol::error(
43                None,
44                "VALIDATION_ERROR",
45                &format!("invalid JSON frame: {e}"),
46            ));
47            return;
48        }
49    };
50
51    let action = parsed.get("action").and_then(Value::as_str);
52    let request_id = parsed.get("requestId").and_then(Value::as_str);
53
54    match action {
55        Some("ping") => {
56            let _ = sink.send(protocol::pong(request_id));
57        }
58        Some("create_conversation_session") => {
59            handle_create_session(state, conn_id, origin, &parsed, request_id, sink).await;
60        }
61        Some("get_session") => {
62            handle_get_session(state, &parsed, request_id, sink);
63        }
64        Some("get_conversation_messages") => {
65            handle_get_conversation_messages(state, &parsed, request_id, sink).await;
66        }
67        Some("send_message") => {
68            handle_send_message(state, access, &parsed, request_id, sink).await;
69        }
70        Some("confirm_tool_action") => {
71            handle_confirm_tool_action(state, &parsed, request_id, sink);
72        }
73        Some(other) => {
74            let _ = sink.send(protocol::error(
75                request_id,
76                "UNSUPPORTED_ACTION",
77                &format!("action '{other}' is not supported by this server"),
78            ));
79        }
80        None => {
81            let _ = sink.send(protocol::error(
82                request_id,
83                "VALIDATION_ERROR",
84                "missing 'action' field",
85            ));
86        }
87    }
88}
89
90/// Enforce an agent's embeddable-widget policy (origin allowlist + `authContext`)
91/// before a session is created. Returns `true` to proceed, or `false` after
92/// emitting a protocol `error` (the caller must then stop). Agents with no policy
93/// proceed — unless `WIDGET_AUTH_STRICT` is set, in which case an unknown agent is
94/// rejected (fail closed).
95async fn enforce_widget_auth(
96    state: &AppState,
97    origin: Option<&str>,
98    agent_id: &str,
99    parsed: &Value,
100    request_id: Option<&str>,
101    sink: &UnboundedSender<Value>,
102) -> bool {
103    let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
104        if state.config.widget_auth_strict {
105            let _ = sink.send(protocol::error(
106                request_id,
107                "AGENT_NOT_AUTHORIZED",
108                "this agent is not registered for embedding",
109            ));
110            return false;
111        }
112        return true;
113    };
114
115    // Origin allowlist — fail closed: a missing or disallowed `Origin` is rejected.
116    if !smooth_operator::widget_auth::origin_allowed(
117        &policy.allowed_origins,
118        origin.unwrap_or_default(),
119    ) {
120        let _ = sink.send(protocol::error(
121            request_id,
122            "ORIGIN_NOT_ALLOWED",
123            "this origin is not allowed to embed this agent",
124        ));
125        return false;
126    }
127
128    // Pre-auth `authContext` (optional): when present it must verify.
129    if let Some(ac) = parsed.get("authContext") {
130        if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
131            let _ = sink.send(protocol::error(
132                request_id,
133                "AUTH_CONTEXT_INVALID",
134                "authContext signature failed verification",
135            ));
136            return false;
137        }
138    }
139    true
140}
141
142/// Verify a JSON `authContext` (`{userId, signature, timestamp}`) against the
143/// agent's `public_key`. False on any missing field/key or signature/replay
144/// failure. Replay window: 60s.
145fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
146    let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
147        public_key,
148        ac.get("userId").and_then(Value::as_str),
149        ac.get("signature").and_then(Value::as_str),
150        ac.get("timestamp").and_then(Value::as_i64),
151    ) else {
152        return false;
153    };
154    let now = chrono::Utc::now().timestamp();
155    smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
156}
157
158/// `create_conversation_session` — create a conversation + user & agent
159/// participants + a session, then reply with an `immediate_response` carrying
160/// the session descriptor (per `create-conversation-session.schema.json`).
161async fn handle_create_session(
162    state: &AppState,
163    conn_id: &str,
164    origin: Option<&str>,
165    parsed: &Value,
166    request_id: Option<&str>,
167    sink: &UnboundedSender<Value>,
168) {
169    let agent_id = parsed
170        .get("agentId")
171        .and_then(Value::as_str)
172        .map(str::to_string)
173        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
174
175    // Embeddable-widget auth: enforce the agent's origin allowlist + authContext
176    // before creating any session. No-op for agents without a policy (unless
177    // WIDGET_AUTH_STRICT). On denial, an error is emitted and we stop here.
178    if !enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
179        return;
180    }
181
182    let user_name = parsed
183        .get("userName")
184        .and_then(Value::as_str)
185        .unwrap_or("Visitor")
186        .to_string();
187    let user_email = parsed
188        .get("userEmail")
189        .and_then(Value::as_str)
190        .map(str::to_string);
191    let browser_fingerprint = parsed
192        .get("browserFingerprint")
193        .and_then(Value::as_str)
194        .map(str::to_string);
195
196    let now = chrono::Utc::now();
197    // The reference server is single-org; conversations belong to the seed org so
198    // the admin API's org-scoping (document sets, indexing runs) lines up with
199    // the seeded knowledge. A multi-tenant deployment derives this from auth.
200    let org_id = crate::server::SEED_ORG_ID.to_string();
201
202    let conversation_id = uuid::Uuid::new_v4().to_string();
203    let session_id = uuid::Uuid::new_v4().to_string();
204    let user_participant_id = uuid::Uuid::new_v4().to_string();
205    let agent_participant_id = uuid::Uuid::new_v4().to_string();
206
207    // Associate this connection with its session (and agent) on the backplane so
208    // events published to the session/agent — by an agent turn or any other
209    // service — reach this client's socket, on this pod or (with a Redis/NATS
210    // backplane) any pod.
211    state
212        .backplane
213        .associate(
214            conn_id,
215            smooth_operator::backplane::Target::Session(session_id.clone()),
216        )
217        .await;
218    state
219        .backplane
220        .associate(
221            conn_id,
222            smooth_operator::backplane::Target::Agent(agent_id.clone()),
223        )
224        .await;
225
226    let conversation = Conversation {
227        id: conversation_id.clone(),
228        platform: Platform::Web,
229        name: format!("Session {session_id}"),
230        organization_id: org_id.clone(),
231        idempotency_key: session_id.clone(),
232        metadata_json: parsed.get("metadata").cloned(),
233        analytics_json: None,
234        created_at: now,
235        updated_at: now,
236    };
237
238    let user_participant = Participant {
239        id: user_participant_id.clone(),
240        conversation_id: conversation_id.clone(),
241        organization_id: org_id.clone(),
242        participant_type: ParticipantType::User,
243        external_id: None,
244        internal_id: None,
245        browser_fingerprint,
246        browser_info: None,
247        name: user_name,
248        email: user_email,
249        phone: None,
250        crm_contact_id: None,
251        metadata_json: None,
252        created_at: now,
253        updated_at: now,
254    };
255
256    let agent_participant = Participant {
257        id: agent_participant_id.clone(),
258        conversation_id: conversation_id.clone(),
259        organization_id: org_id.clone(),
260        participant_type: ParticipantType::AiAgent,
261        external_id: None,
262        internal_id: Some(agent_id.clone()),
263        browser_fingerprint: None,
264        browser_info: None,
265        name: AGENT_NAME.to_string(),
266        email: None,
267        phone: None,
268        crm_contact_id: None,
269        metadata_json: None,
270        created_at: now,
271        updated_at: now,
272    };
273
274    let session = Session {
275        session_id: session_id.clone(),
276        conversation_id: conversation_id.clone(),
277        agent_id: agent_id.clone(),
278        agent_name: AGENT_NAME.to_string(),
279        user_participant_id: user_participant_id.clone(),
280        agent_participant_id: agent_participant_id.clone(),
281        // The thread id is the conversation id: per-session memory is carried by
282        // replaying this conversation's persisted message log (see runner.rs).
283        thread_id: conversation_id.clone(),
284        status: Some(SessionStatus::Active),
285        token_count: Some(0),
286        message_count: Some(0),
287        metadata: None,
288        created_at: Some(now),
289        updated_at: Some(now),
290        ended_at: None,
291        last_activity_at: Some(now),
292    };
293
294    // Persist to the storage adapter (best-effort: a failure surfaces as error).
295    let storage = state.storage.clone();
296    let sink_clone = sink.clone();
297    let request_id_owned = request_id.map(str::to_string);
298    let session_for_registry = session.clone();
299    let state_clone = state.clone();
300
301    let data = json!({
302        "sessionId": session_id,
303        "conversationId": conversation_id,
304        "agentId": agent_id,
305        "agentName": AGENT_NAME,
306        "userParticipantId": user_participant_id,
307        "agentParticipantId": agent_participant_id,
308    });
309
310    tokio::spawn(async move {
311        let rid = request_id_owned.as_deref();
312        if let Err(e) = storage.create_conversation(conversation).await {
313            let _ = sink_clone.send(protocol::error(
314                rid,
315                "INTERNAL_ERROR",
316                &format!("create conversation failed: {e}"),
317            ));
318            return;
319        }
320        if let Err(e) = storage.add_participant(user_participant).await {
321            let _ = sink_clone.send(protocol::error(
322                rid,
323                "INTERNAL_ERROR",
324                &format!("add user participant failed: {e}"),
325            ));
326            return;
327        }
328        if let Err(e) = storage.add_participant(agent_participant).await {
329            let _ = sink_clone.send(protocol::error(
330                rid,
331                "INTERNAL_ERROR",
332                &format!("add agent participant failed: {e}"),
333            ));
334            return;
335        }
336        if let Err(e) = storage.create_session(session).await {
337            let _ = sink_clone.send(protocol::error(
338                rid,
339                "INTERNAL_ERROR",
340                &format!("create session failed: {e}"),
341            ));
342            return;
343        }
344        state_clone.insert_session(session_for_registry);
345        let _ = sink_clone.send(protocol::immediate_response(
346            rid,
347            200,
348            "Session created",
349            data,
350        ));
351    });
352}
353
354/// `get_session` — return the session snapshot (per `get-session.schema.json`).
355fn handle_get_session(
356    state: &AppState,
357    parsed: &Value,
358    request_id: Option<&str>,
359    sink: &UnboundedSender<Value>,
360) {
361    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
362        let _ = sink.send(protocol::error(
363            request_id,
364            "VALIDATION_ERROR",
365            "missing 'sessionId'",
366        ));
367        return;
368    };
369
370    match state.get_session(session_id) {
371        Some(s) => {
372            let data = json!({
373                "sessionId": s.session_id,
374                "conversationId": s.conversation_id,
375                "agentId": s.agent_id,
376                "agentName": s.agent_name,
377                "userParticipantId": s.user_participant_id,
378                "agentParticipantId": s.agent_participant_id,
379                "threadId": s.thread_id,
380                "status": s.status.map_or("active", |st| match st {
381                    SessionStatus::Active => "active",
382                    SessionStatus::Idle => "idle",
383                    SessionStatus::Ended => "ended",
384                }),
385            });
386            let _ = sink.send(protocol::immediate_response(
387                request_id, 200, "Session", data,
388            ));
389        }
390        None => {
391            let _ = sink.send(protocol::error(
392                request_id,
393                "SESSION_NOT_FOUND",
394                &format!("session '{session_id}' not found"),
395            ));
396        }
397    }
398}
399
400/// `get_conversation_messages` — paginated message history for a session's
401/// conversation. Wraps the storage adapter's `list_messages_by_conversation`
402/// (the same call the admin API + the turn runner use) and replies with an
403/// `immediate_response` carrying `{ conversationId, messages, nextCursor, hasMore }`.
404///
405/// Optional inputs: `limit` (default 50) and an opaque `cursor` from a prior
406/// page's `nextCursor`. Newest-first (the common "recent history" read).
407async fn handle_get_conversation_messages(
408    state: &AppState,
409    parsed: &Value,
410    request_id: Option<&str>,
411    sink: &UnboundedSender<Value>,
412) {
413    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
414        let _ = sink.send(protocol::error(
415            request_id,
416            "VALIDATION_ERROR",
417            "missing 'sessionId'",
418        ));
419        return;
420    };
421    let Some(session) = state.get_session(session_id) else {
422        let _ = sink.send(protocol::error(
423            request_id,
424            "SESSION_NOT_FOUND",
425            &format!("session '{session_id}' not found"),
426        ));
427        return;
428    };
429
430    const DEFAULT_LIMIT: usize = 50;
431    let limit = parsed
432        .get("limit")
433        .and_then(Value::as_u64)
434        .map(|n| n as usize)
435        .filter(|n| *n > 0)
436        .unwrap_or(DEFAULT_LIMIT);
437    let cursor = parsed
438        .get("cursor")
439        .and_then(Value::as_str)
440        .map(str::to_string);
441
442    let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
443    query.cursor = cursor;
444    query.descending = true;
445
446    match state.storage.list_messages_by_conversation(query).await {
447        Ok(page) => {
448            let data = json!({
449                "conversationId": session.conversation_id,
450                "messages": page.messages,
451                "nextCursor": page.next_cursor,
452                "hasMore": page.next_cursor.is_some(),
453            });
454            let _ = sink.send(protocol::immediate_response(
455                request_id,
456                200,
457                "ConversationMessages",
458                data,
459            ));
460        }
461        Err(e) => {
462            let _ = sink.send(protocol::error(
463                request_id,
464                "STORAGE_ERROR",
465                &format!("failed to list messages: {e}"),
466            ));
467        }
468    }
469}
470
471/// `send_message` — ack with `immediate_response` (202), run a streaming
472/// knowledge-grounded turn, emit `stream_token` / `stream_chunk` as it goes, and
473/// finish with `eventual_response` (200). Errors (no gateway key, unknown
474/// session, agent failure) surface as clean `error` events.
475async fn handle_send_message(
476    state: &AppState,
477    access: &AccessContext,
478    parsed: &Value,
479    request_id: Option<&str>,
480    sink: &UnboundedSender<Value>,
481) {
482    // requestId is load-bearing for streaming correlation; require it.
483    let Some(request_id) = request_id else {
484        let _ = sink.send(protocol::error(
485            None,
486            "VALIDATION_ERROR",
487            "send_message requires a 'requestId'",
488        ));
489        return;
490    };
491
492    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
493        let _ = sink.send(protocol::error(
494            Some(request_id),
495            "VALIDATION_ERROR",
496            "missing 'sessionId'",
497        ));
498        return;
499    };
500
501    let message = match parsed.get("message").and_then(Value::as_str) {
502        Some(m) if !m.trim().is_empty() => m.to_string(),
503        _ => {
504            let _ = sink.send(protocol::error(
505                Some(request_id),
506                "VALIDATION_ERROR",
507                "missing or empty 'message'",
508            ));
509            return;
510        }
511    };
512
513    let Some(session) = state.get_session(session_id) else {
514        let _ = sink.send(protocol::error(
515            Some(request_id),
516            "SESSION_NOT_FOUND",
517            &format!("session '{session_id}' not found"),
518        ));
519        return;
520    };
521
522    // A test-injected provider (the scenario-parity corpus's `MockLlmClient`)
523    // overrides the live gateway client entirely — the turn never touches the
524    // network — so it does NOT need a configured gateway key. Production leaves
525    // `chat_provider` `None`, so this clone is `None` and the key gate below is
526    // unchanged.
527    let chat_provider = state.chat_provider.clone();
528
529    // Resolve the gateway key for this turn's org. The conversation carries the
530    // org; the resolver maps it to a per-org key (e.g. a LiteLLM virtual key per
531    // tenant) so a multi-tenant flavor bills/scopes each org separately. The
532    // default `EnvGatewayKeyResolver` returns the single env key for every org,
533    // so the local/default flavor is unchanged. On `None` (no per-org key) we
534    // fall back to the env key; only when neither supplies a key do we error.
535    let org_id = match state
536        .storage
537        .get_conversation(&session.conversation_id)
538        .await
539    {
540        Ok(Some(conversation)) => conversation.organization_id,
541        // No conversation row (shouldn't happen for a live session) → resolve as
542        // if anonymous; the env fallback still applies.
543        Ok(None) | Err(_) => String::new(),
544    };
545    let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
546        &state.gateway_key_resolver,
547        &org_id,
548        state.config.gateway_key.as_deref(),
549    )
550    .await;
551
552    // No resolvable key → can't run a *live* LLM turn. Return a clean error (the
553    // server stays usable for protocol-only checks). When a mock provider is
554    // injected we fall back to a placeholder config — the mock replaces the
555    // client built from it, so its url/key/model are never used.
556    // Keep a copy of the resolved key to thread into the turn's
557    // `ToolProviderContext` (a retrieval-style host tool calls the same gateway);
558    // `None` on the mock/placeholder path so a host tool can fall back.
559    let turn_gateway_key = resolved_key.clone();
560    let llm = match resolved_key {
561        Some(key) => state.config.llm_config_with_key(key),
562        None if chat_provider.is_some() => state.config.placeholder_llm_config(),
563        None => {
564            let _ = sink.send(protocol::error(
565                Some(request_id),
566                "LLM_UNAVAILABLE",
567                "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
568                 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
569                 key to enable send_message.",
570            ));
571            return;
572        }
573    };
574
575    // Ack: processing started.
576    let _ = sink.send(protocol::immediate_response(
577        Some(request_id),
578        202,
579        "Processing your request...",
580        json!({}),
581    ));
582
583    // Run the turn in a spawned task, NOT inline. A turn that calls a
584    // confirmation-gated write tool **parks** awaiting a later
585    // `confirm_tool_action` frame; the socket reader dispatches that frame on the
586    // same connection, so blocking the reader here would deadlock (the confirm
587    // can never be read). Spawning frees the reader to receive the confirmation
588    // while the turn streams its events through the (cloned) sink. Pearl: HITL
589    // pause/resume.
590    let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
591        crate::runner::ConfirmationConfig {
592            tool_patterns: patterns,
593            session_id: session.session_id.clone(),
594            register: {
595                let state = state.clone();
596                Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
597            },
598            clear: {
599                let state = state.clone();
600                Arc::new(move |sid: &str| state.clear_confirmation(sid))
601            },
602        }
603    });
604
605    // The reference server is single-org; a multi-tenant host derives this from
606    // auth. Used to (a) resolve the org's persona override (SEAM 2) and (b)
607    // scope the host's tool provider (SEAM 1).
608    let org_id = crate::server::SEED_ORG_ID.to_string();
609    // SEAM 2 — resolve the per-org persona. With the default in-memory settings
610    // store the override is `None`, so the runner stays on its const prompt and
611    // behavior is unchanged.
612    let system_prompt = state.settings.get(&org_id).persona;
613    // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
614    let tool_provider = state.tool_provider.clone();
615
616    let state_for_turn = state.clone();
617    let access_owned = access.clone();
618    let sink_owned = sink.clone();
619    let request_id_owned = request_id.to_string();
620    let conversation_id = session.conversation_id.clone();
621
622    tokio::spawn(async move {
623        let result = runner::run_streaming_turn(
624            TurnRequest {
625                storage: state_for_turn.storage.clone(),
626                llm,
627                max_iterations: state_for_turn.config.max_iterations,
628                conversation_id: &conversation_id,
629                request_id: &request_id_owned,
630                user_message: &message,
631                // The connection's resolved document-level entitlement: retrieval is
632                // filtered to what this requester may read (org-public only when the
633                // connection is anonymous).
634                access: access_owned,
635                // Production: `None` (a live client is built from `llm`). Tests: the
636                // scenario corpus's `MockLlmClient`, which runs the turn offline.
637                llm_provider: chat_provider,
638                // Opt-in rerank stage (feature gap G8): `None` unless the operator
639                // enabled it via `SMOOTH_AGENT_RERANK` (gateway/lexical). Default-off
640                // keeps retrieval behavior unchanged.
641                reranker: crate::reranker::build_reranker(
642                    &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
643                ),
644                confirmation,
645                // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
646                tool_provider,
647                // SEAM 2 — resolved per-org persona (None ⇒ const prompt).
648                system_prompt,
649                org_id: Some(org_id),
650                // The per-org key resolved above, threaded so a host tool
651                // provider's retrieval tools call the same gateway this turn used.
652                gateway_key: turn_gateway_key,
653            },
654            &sink_owned,
655        )
656        .await;
657
658        match result {
659            Ok(turn) => {
660                let response = runner::general_agent_response(&turn.reply);
661                let _ = sink_owned.send(protocol::eventual_response(
662                    &request_id_owned,
663                    200,
664                    &turn.message_id,
665                    response,
666                    false,
667                    &turn.citations,
668                ));
669            }
670            Err(e) => {
671                let _ = sink_owned.send(protocol::error(
672                    Some(&request_id_owned),
673                    "AGENT_ERROR",
674                    &format!("agent turn failed: {e}"),
675                ));
676            }
677        }
678    });
679}
680
681/// `confirm_tool_action` — resume a turn parked on a write-tool confirmation.
682///
683/// Per `spec/actions/confirm-tool-action.schema.json` the client sends
684/// `{ action, sessionId, requestId, approved }` in reply to a
685/// `write_confirmation_required` event. We look up the session's registered
686/// [`HumanResponse`](smooth_operator_core::HumanResponse) sender (set by the
687/// runner's confirmation bridge when the turn parked), take it, and feed it the
688/// verdict: `approved` → `Approved` (the parked tool executes), else `Denied`
689/// (the tool is skipped with a rejection result the model sees). There is no
690/// dedicated response event — the resumed workflow signals continuation via its
691/// normal streaming sequence (`stream_chunk`/`stream_token` → `eventual_response`);
692/// we additionally ack with an `immediate_response`. Taking the sender makes a
693/// duplicate confirm a no-op (`NO_PENDING_CONFIRMATION`).
694fn handle_confirm_tool_action(
695    state: &AppState,
696    parsed: &Value,
697    request_id: Option<&str>,
698    sink: &UnboundedSender<Value>,
699) {
700    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
701        let _ = sink.send(protocol::error(
702            request_id,
703            "VALIDATION_ERROR",
704            "confirm_tool_action requires a 'sessionId'",
705        ));
706        return;
707    };
708
709    // `approved` is required and must be a boolean — a missing/garbled verdict
710    // must NOT silently approve a write. Fail closed on a bad shape.
711    let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
712        let _ = sink.send(protocol::error(
713            request_id,
714            "VALIDATION_ERROR",
715            "confirm_tool_action requires a boolean 'approved'",
716        ));
717        return;
718    };
719
720    let Some(responder) = state.take_confirmation(session_id) else {
721        let _ = sink.send(protocol::error(
722            request_id,
723            "NO_PENDING_CONFIRMATION",
724            &format!("no tool action is awaiting confirmation for session '{session_id}'"),
725        ));
726        return;
727    };
728
729    let verdict = if approved {
730        smooth_operator_core::HumanResponse::Approved
731    } else {
732        smooth_operator_core::HumanResponse::Denied {
733            reason: "user rejected the action".to_string(),
734        }
735    };
736
737    if responder.send(verdict).is_err() {
738        // The parked turn ended (timeout / disconnect) before the confirm landed.
739        let _ = sink.send(protocol::error(
740            request_id,
741            "NO_PENDING_CONFIRMATION",
742            &format!(
743                "the turn awaiting confirmation for session '{session_id}' is no longer active"
744            ),
745        ));
746        return;
747    }
748
749    // Ack the confirmation; the resumed turn streams its own follow-on events.
750    let _ = sink.send(protocol::immediate_response(
751        request_id,
752        200,
753        if approved {
754            "Tool action approved"
755        } else {
756            "Tool action rejected"
757        },
758        json!({ "sessionId": session_id, "approved": approved }),
759    ));
760}