Skip to main content

smooth_operator_server/
handler.rs

1//! Action dispatch — parses a client action envelope and produces the matching
2//! server events.
3//!
4//! Each handler is `async` and forwards events through an
5//! `UnboundedSender<serde_json::Value>` (the per-connection outbound sink). The
6//! socket task drains the sink and writes each value as a JSON WS text frame, so
7//! streaming actions (`send_message`) can emit many events while still being
8//! driven from one place.
9
10use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::agent_config::{AgentBehaviorConfig, AuthGateHook, AuthLevel};
17use smooth_operator::domain::{
18    Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
19};
20use smooth_operator_core::llm_provider::LlmProvider;
21use smooth_operator_core::{LlmClient, LlmConfig};
22
23use crate::protocol;
24use crate::runner;
25use crate::runner::TurnRequest;
26use crate::state::AppState;
27
28/// The agent's display name for the reference server.
29const AGENT_NAME: &str = "smooth-agent";
30
31/// Parse and dispatch a single inbound text frame. Any produced events are sent
32/// through `sink`. Returns `Ok(())` always — protocol-level failures are
33/// surfaced as `error` events, never as hard errors that drop the connection.
34pub async fn handle_frame(
35    state: &AppState,
36    access: &AccessContext,
37    conn_id: &str,
38    origin: Option<&str>,
39    auth_org: Option<&str>,
40    raw: &str,
41    sink: &UnboundedSender<Value>,
42) {
43    let parsed: Value = match serde_json::from_str(raw) {
44        Ok(v) => v,
45        Err(e) => {
46            let _ = sink.send(protocol::error(
47                None,
48                "VALIDATION_ERROR",
49                &format!("invalid JSON frame: {e}"),
50            ));
51            return;
52        }
53    };
54
55    let action = parsed.get("action").and_then(Value::as_str);
56    let request_id = parsed.get("requestId").and_then(Value::as_str);
57
58    match action {
59        Some("ping") => {
60            let _ = sink.send(protocol::pong(request_id));
61        }
62        Some("create_conversation_session") => {
63            handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
64                .await;
65        }
66        Some("get_session") => {
67            handle_get_session(state, &parsed, request_id, sink);
68        }
69        Some("get_conversation_messages") => {
70            handle_get_conversation_messages(state, &parsed, request_id, sink).await;
71        }
72        Some("send_message") => {
73            handle_send_message(state, access, &parsed, request_id, sink).await;
74        }
75        Some("confirm_tool_action") => {
76            handle_confirm_tool_action(state, &parsed, request_id, sink);
77        }
78        Some(other) => {
79            let _ = sink.send(protocol::error(
80                request_id,
81                "UNSUPPORTED_ACTION",
82                &format!("action '{other}' is not supported by this server"),
83            ));
84        }
85        None => {
86            let _ = sink.send(protocol::error(
87                request_id,
88                "VALIDATION_ERROR",
89                "missing 'action' field",
90            ));
91        }
92    }
93}
94
95/// Outcome of widget-auth enforcement: whether to proceed, and (when an agent
96/// policy resolved) the org that policy attributes the agent to.
97enum WidgetAuthOutcome {
98    /// Auth denied — an `error` event was already emitted; the caller must stop.
99    Denied,
100    /// Auth passed. `org_id` is `Some` when the resolved policy carried an
101    /// `organization_id` (a multi-tenant host that knows the agent's org), else
102    /// `None` (no policy, or a policy without an org — org derivation falls
103    /// through to the JWT principal, then the seed org).
104    Allowed { org_id: Option<String> },
105}
106
107/// Enforce an agent's embeddable-widget policy (origin allowlist + `authContext`)
108/// before a session is created. Returns [`WidgetAuthOutcome::Allowed`] to proceed
109/// (carrying the policy's org when known), or [`WidgetAuthOutcome::Denied`] after
110/// emitting a protocol `error` (the caller must then stop). Agents with no policy
111/// proceed — unless `WIDGET_AUTH_STRICT` is set, in which case an unknown agent is
112/// rejected (fail closed).
113async fn enforce_widget_auth(
114    state: &AppState,
115    origin: Option<&str>,
116    agent_id: &str,
117    parsed: &Value,
118    request_id: Option<&str>,
119    sink: &UnboundedSender<Value>,
120) -> WidgetAuthOutcome {
121    let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
122        if state.config.widget_auth_strict {
123            let _ = sink.send(protocol::error(
124                request_id,
125                "AGENT_NOT_AUTHORIZED",
126                "this agent is not registered for embedding",
127            ));
128            return WidgetAuthOutcome::Denied;
129        }
130        return WidgetAuthOutcome::Allowed { org_id: None };
131    };
132
133    // Origin allowlist — fail closed: a missing or disallowed `Origin` is rejected.
134    if !smooth_operator::widget_auth::origin_allowed(
135        &policy.allowed_origins,
136        origin.unwrap_or_default(),
137    ) {
138        let _ = sink.send(protocol::error(
139            request_id,
140            "ORIGIN_NOT_ALLOWED",
141            "this origin is not allowed to embed this agent",
142        ));
143        return WidgetAuthOutcome::Denied;
144    }
145
146    // Pre-auth `authContext` (optional): when present it must verify.
147    if let Some(ac) = parsed.get("authContext") {
148        if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
149            let _ = sink.send(protocol::error(
150                request_id,
151                "AUTH_CONTEXT_INVALID",
152                "authContext signature failed verification",
153            ));
154            return WidgetAuthOutcome::Denied;
155        }
156    }
157    WidgetAuthOutcome::Allowed {
158        org_id: policy.organization_id,
159    }
160}
161
162/// Verify a JSON `authContext` (`{userId, signature, timestamp}`) against the
163/// agent's `public_key`. False on any missing field/key or signature/replay
164/// failure. Replay window: 60s.
165fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
166    let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
167        public_key,
168        ac.get("userId").and_then(Value::as_str),
169        ac.get("signature").and_then(Value::as_str),
170        ac.get("timestamp").and_then(Value::as_i64),
171    ) else {
172        return false;
173    };
174    let now = chrono::Utc::now().timestamp();
175    smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
176}
177
178/// `create_conversation_session` — create a conversation + user & agent
179/// participants + a session, then reply with an `immediate_response` carrying
180/// the session descriptor (per `create-conversation-session.schema.json`).
181async fn handle_create_session(
182    state: &AppState,
183    conn_id: &str,
184    origin: Option<&str>,
185    auth_org: Option<&str>,
186    parsed: &Value,
187    request_id: Option<&str>,
188    sink: &UnboundedSender<Value>,
189) {
190    let agent_id = parsed
191        .get("agentId")
192        .and_then(Value::as_str)
193        .map(str::to_string)
194        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
195
196    // Embeddable-widget auth: enforce the agent's origin allowlist + authContext
197    // before creating any session. No-op for agents without a policy (unless
198    // WIDGET_AUTH_STRICT). On denial, an error is emitted and we stop here. A
199    // resolved policy may also carry the agent's org (multi-tenant host).
200    let widget_org =
201        match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
202            WidgetAuthOutcome::Denied => return,
203            WidgetAuthOutcome::Allowed { org_id } => org_id,
204        };
205
206    let user_name = parsed
207        .get("userName")
208        .and_then(Value::as_str)
209        .unwrap_or("Visitor")
210        .to_string();
211    let user_email = parsed
212        .get("userEmail")
213        .and_then(Value::as_str)
214        .map(str::to_string);
215    let browser_fingerprint = parsed
216        .get("browserFingerprint")
217        .and_then(Value::as_str)
218        .map(str::to_string);
219
220    let now = chrono::Utc::now();
221    // Derive the org this session (conversation + participants) belongs to, in
222    // priority order:
223    //   1. the widget policy's `organization_id` — a multi-tenant host that knows
224    //      the agent's org (widget visitors authenticate via origin/authContext,
225    //      not a JWT, so their org rides on the agent's policy);
226    //   2. the connection's authenticated JWT principal org (`auth_org`) — a
227    //      dashboard user / authed client;
228    //   3. the server's seed org — the single-org reference/dev case, so the
229    //      admin API's org-scoping (document sets, indexing runs) still lines up
230    //      with the seeded knowledge. This keeps the no-auth/local flavor
231    //      behavior unchanged.
232    let org_id = widget_org
233        .or_else(|| auth_org.map(str::to_string))
234        .unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
235
236    let conversation_id = uuid::Uuid::new_v4().to_string();
237    let session_id = uuid::Uuid::new_v4().to_string();
238    let user_participant_id = uuid::Uuid::new_v4().to_string();
239    let agent_participant_id = uuid::Uuid::new_v4().to_string();
240
241    // Associate this connection with its session (and agent) on the backplane so
242    // events published to the session/agent — by an agent turn or any other
243    // service — reach this client's socket, on this pod or (with a Redis/NATS
244    // backplane) any pod.
245    state
246        .backplane
247        .associate(
248            conn_id,
249            smooth_operator::backplane::Target::Session(session_id.clone()),
250        )
251        .await;
252    state
253        .backplane
254        .associate(
255            conn_id,
256            smooth_operator::backplane::Target::Agent(agent_id.clone()),
257        )
258        .await;
259
260    let conversation = Conversation {
261        id: conversation_id.clone(),
262        platform: Platform::Web,
263        name: format!("Session {session_id}"),
264        organization_id: org_id.clone(),
265        idempotency_key: session_id.clone(),
266        metadata_json: parsed.get("metadata").cloned(),
267        analytics_json: None,
268        created_at: now,
269        updated_at: now,
270    };
271
272    let user_participant = Participant {
273        id: user_participant_id.clone(),
274        conversation_id: conversation_id.clone(),
275        organization_id: org_id.clone(),
276        participant_type: ParticipantType::User,
277        external_id: None,
278        internal_id: None,
279        browser_fingerprint,
280        browser_info: None,
281        name: user_name,
282        email: user_email,
283        phone: None,
284        crm_contact_id: None,
285        metadata_json: None,
286        created_at: now,
287        updated_at: now,
288    };
289
290    let agent_participant = Participant {
291        id: agent_participant_id.clone(),
292        conversation_id: conversation_id.clone(),
293        organization_id: org_id.clone(),
294        participant_type: ParticipantType::AiAgent,
295        external_id: None,
296        internal_id: Some(agent_id.clone()),
297        browser_fingerprint: None,
298        browser_info: None,
299        name: AGENT_NAME.to_string(),
300        email: None,
301        phone: None,
302        crm_contact_id: None,
303        metadata_json: None,
304        created_at: now,
305        updated_at: now,
306    };
307
308    let session = Session {
309        session_id: session_id.clone(),
310        conversation_id: conversation_id.clone(),
311        organization_id: org_id.clone(),
312        agent_id: agent_id.clone(),
313        agent_name: AGENT_NAME.to_string(),
314        user_participant_id: user_participant_id.clone(),
315        agent_participant_id: agent_participant_id.clone(),
316        // The thread id is the conversation id: per-session memory is carried by
317        // replaying this conversation's persisted message log (see runner.rs).
318        thread_id: conversation_id.clone(),
319        status: Some(SessionStatus::Active),
320        token_count: Some(0),
321        message_count: Some(0),
322        metadata: None,
323        created_at: Some(now),
324        updated_at: Some(now),
325        ended_at: None,
326        last_activity_at: Some(now),
327    };
328
329    // Persist to the storage adapter (best-effort: a failure surfaces as error).
330    let storage = state.storage.clone();
331    let sink_clone = sink.clone();
332    let request_id_owned = request_id.map(str::to_string);
333    let session_for_registry = session.clone();
334    let state_clone = state.clone();
335
336    let data = json!({
337        "sessionId": session_id,
338        "conversationId": conversation_id,
339        "agentId": agent_id,
340        "agentName": AGENT_NAME,
341        "userParticipantId": user_participant_id,
342        "agentParticipantId": agent_participant_id,
343    });
344
345    tokio::spawn(async move {
346        let rid = request_id_owned.as_deref();
347        if let Err(e) = storage.create_conversation(conversation).await {
348            let _ = sink_clone.send(protocol::error(
349                rid,
350                "INTERNAL_ERROR",
351                &format!("create conversation failed: {e}"),
352            ));
353            return;
354        }
355        if let Err(e) = storage.add_participant(user_participant).await {
356            let _ = sink_clone.send(protocol::error(
357                rid,
358                "INTERNAL_ERROR",
359                &format!("add user participant failed: {e}"),
360            ));
361            return;
362        }
363        if let Err(e) = storage.add_participant(agent_participant).await {
364            let _ = sink_clone.send(protocol::error(
365                rid,
366                "INTERNAL_ERROR",
367                &format!("add agent participant failed: {e}"),
368            ));
369            return;
370        }
371        if let Err(e) = storage.create_session(session).await {
372            let _ = sink_clone.send(protocol::error(
373                rid,
374                "INTERNAL_ERROR",
375                &format!("create session failed: {e}"),
376            ));
377            return;
378        }
379        state_clone.insert_session(session_for_registry);
380        let _ = sink_clone.send(protocol::immediate_response(
381            rid,
382            200,
383            "Session created",
384            data,
385        ));
386    });
387}
388
389/// `get_session` — return the session snapshot (per `get-session.schema.json`).
390fn handle_get_session(
391    state: &AppState,
392    parsed: &Value,
393    request_id: Option<&str>,
394    sink: &UnboundedSender<Value>,
395) {
396    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
397        let _ = sink.send(protocol::error(
398            request_id,
399            "VALIDATION_ERROR",
400            "missing 'sessionId'",
401        ));
402        return;
403    };
404
405    match state.get_session(session_id) {
406        Some(s) => {
407            let data = json!({
408                "sessionId": s.session_id,
409                "conversationId": s.conversation_id,
410                "agentId": s.agent_id,
411                "agentName": s.agent_name,
412                "userParticipantId": s.user_participant_id,
413                "agentParticipantId": s.agent_participant_id,
414                "threadId": s.thread_id,
415                "status": s.status.map_or("active", |st| match st {
416                    SessionStatus::Active => "active",
417                    SessionStatus::Idle => "idle",
418                    SessionStatus::Ended => "ended",
419                }),
420            });
421            let _ = sink.send(protocol::immediate_response(
422                request_id, 200, "Session", data,
423            ));
424        }
425        None => {
426            let _ = sink.send(protocol::error(
427                request_id,
428                "SESSION_NOT_FOUND",
429                &format!("session '{session_id}' not found"),
430            ));
431        }
432    }
433}
434
435/// `get_conversation_messages` — paginated message history for a session's
436/// conversation. Wraps the storage adapter's `list_messages_by_conversation`
437/// (the same call the admin API + the turn runner use) and replies with an
438/// `immediate_response` carrying `{ conversationId, messages, nextCursor, hasMore }`.
439///
440/// Optional inputs: `limit` (default 50) and an opaque `cursor` from a prior
441/// page's `nextCursor`. Newest-first (the common "recent history" read).
442async fn handle_get_conversation_messages(
443    state: &AppState,
444    parsed: &Value,
445    request_id: Option<&str>,
446    sink: &UnboundedSender<Value>,
447) {
448    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
449        let _ = sink.send(protocol::error(
450            request_id,
451            "VALIDATION_ERROR",
452            "missing 'sessionId'",
453        ));
454        return;
455    };
456    let Some(session) = state.get_session(session_id) else {
457        let _ = sink.send(protocol::error(
458            request_id,
459            "SESSION_NOT_FOUND",
460            &format!("session '{session_id}' not found"),
461        ));
462        return;
463    };
464
465    const DEFAULT_LIMIT: usize = 50;
466    let limit = parsed
467        .get("limit")
468        .and_then(Value::as_u64)
469        .map(|n| n as usize)
470        .filter(|n| *n > 0)
471        .unwrap_or(DEFAULT_LIMIT);
472    let cursor = parsed
473        .get("cursor")
474        .and_then(Value::as_str)
475        .map(str::to_string);
476
477    let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
478    query.cursor = cursor;
479    query.descending = true;
480
481    match state.storage.list_messages_by_conversation(query).await {
482        Ok(page) => {
483            let data = json!({
484                "conversationId": session.conversation_id,
485                "messages": page.messages,
486                "nextCursor": page.next_cursor,
487                "hasMore": page.next_cursor.is_some(),
488            });
489            let _ = sink.send(protocol::immediate_response(
490                request_id,
491                200,
492                "ConversationMessages",
493                data,
494            ));
495        }
496        Err(e) => {
497            let _ = sink.send(protocol::error(
498                request_id,
499                "STORAGE_ERROR",
500                &format!("failed to list messages: {e}"),
501            ));
502        }
503    }
504}
505
506/// `send_message` — ack with `immediate_response` (202), run a streaming
507/// knowledge-grounded turn, emit `stream_token` / `stream_chunk` as it goes, and
508/// finish with `eventual_response` (200). Errors (no gateway key, unknown
509/// session, agent failure) surface as clean `error` events.
510async fn handle_send_message(
511    state: &AppState,
512    access: &AccessContext,
513    parsed: &Value,
514    request_id: Option<&str>,
515    sink: &UnboundedSender<Value>,
516) {
517    // requestId is load-bearing for streaming correlation; require it.
518    let Some(request_id) = request_id else {
519        let _ = sink.send(protocol::error(
520            None,
521            "VALIDATION_ERROR",
522            "send_message requires a 'requestId'",
523        ));
524        return;
525    };
526
527    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
528        let _ = sink.send(protocol::error(
529            Some(request_id),
530            "VALIDATION_ERROR",
531            "missing 'sessionId'",
532        ));
533        return;
534    };
535
536    let message = match parsed.get("message").and_then(Value::as_str) {
537        Some(m) if !m.trim().is_empty() => m.to_string(),
538        _ => {
539            let _ = sink.send(protocol::error(
540                Some(request_id),
541                "VALIDATION_ERROR",
542                "missing or empty 'message'",
543            ));
544            return;
545        }
546    };
547
548    let Some(session) = state.get_session(session_id) else {
549        let _ = sink.send(protocol::error(
550            Some(request_id),
551            "SESSION_NOT_FOUND",
552            &format!("session '{session_id}' not found"),
553        ));
554        return;
555    };
556
557    // A test-injected provider (the scenario-parity corpus's `MockLlmClient`)
558    // overrides the live gateway client entirely — the turn never touches the
559    // network — so it does NOT need a configured gateway key. Production leaves
560    // `chat_provider` `None`, so this clone is `None` and the key gate below is
561    // unchanged.
562    let chat_provider = state.chat_provider.clone();
563
564    // Resolve the gateway key for this turn's org. The conversation carries the
565    // org; the resolver maps it to a per-org key (e.g. a LiteLLM virtual key per
566    // tenant) so a multi-tenant flavor bills/scopes each org separately. The
567    // default `EnvGatewayKeyResolver` returns the single env key for every org,
568    // so the local/default flavor is unchanged. On `None` (no per-org key) we
569    // fall back to the env key; only when neither supplies a key do we error.
570    let org_id = match state
571        .storage
572        .get_conversation(&session.conversation_id)
573        .await
574    {
575        Ok(Some(conversation)) => conversation.organization_id,
576        // No conversation row (shouldn't happen for a live session) → resolve as
577        // if anonymous; the env fallback still applies.
578        Ok(None) | Err(_) => String::new(),
579    };
580    let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
581        &state.gateway_key_resolver,
582        &org_id,
583        state.config.gateway_key.as_deref(),
584    )
585    .await;
586
587    // No resolvable key → can't run a *live* LLM turn. Return a clean error (the
588    // server stays usable for protocol-only checks). When a mock provider is
589    // injected we fall back to a placeholder config — the mock replaces the
590    // client built from it, so its url/key/model are never used.
591    // Keep a copy of the resolved key to thread into the turn's
592    // `ToolProviderContext` (a retrieval-style host tool calls the same gateway);
593    // `None` on the mock/placeholder path so a host tool can fall back.
594    let turn_gateway_key = resolved_key.clone();
595    let llm = match resolved_key {
596        Some(key) => state.config.llm_config_with_key(key),
597        None if chat_provider.is_some() => state.config.placeholder_llm_config(),
598        None => {
599            let _ = sink.send(protocol::error(
600                Some(request_id),
601                "LLM_UNAVAILABLE",
602                "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
603                 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
604                 key to enable send_message.",
605            ));
606            return;
607        }
608    };
609
610    // Per-turn model override (Smooth Modes / `/smooth-mode` preset): when the
611    // send_message body carries a non-empty `model`, run THIS turn on it,
612    // overriding the server's configured default model. Absent or blank ⇒ the
613    // config default is kept, so behavior is unchanged when the field is unused.
614    let llm = apply_model_override(llm, parsed);
615
616    // Ack: processing started.
617    let _ = sink.send(protocol::immediate_response(
618        Some(request_id),
619        202,
620        "Processing your request...",
621        json!({}),
622    ));
623
624    // Run the turn in a spawned task, NOT inline. A turn that calls a
625    // confirmation-gated write tool **parks** awaiting a later
626    // `confirm_tool_action` frame; the socket reader dispatches that frame on the
627    // same connection, so blocking the reader here would deadlock (the confirm
628    // can never be read). Spawning frees the reader to receive the confirmation
629    // while the turn streams its events through the (cloned) sink. Pearl: HITL
630    // pause/resume.
631    let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
632        crate::runner::ConfirmationConfig {
633            tool_patterns: patterns,
634            session_id: session.session_id.clone(),
635            register: {
636                let state = state.clone();
637                Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
638            },
639            clear: {
640                let state = state.clone();
641                Arc::new(move |sid: &str| state.clear_confirmation(sid))
642            },
643        }
644    });
645
646    // The reference server is single-org; a multi-tenant host derives this from
647    // auth. Used to (a) resolve the org's persona override (SEAM 2) and (b)
648    // scope the host's tool provider (SEAM 1).
649    let org_id = crate::server::SEED_ORG_ID.to_string();
650
651    // SEAM 3 — per-agent behavior config (instructions + conversation workflow),
652    // resolved by the connection's `agent_id` so two agents in the same org can
653    // behave differently. Absent / malformed ⇒ `None`, so the org-default persona
654    // (SEAM 2) is used, unchanged. Isolated per agent by construction.
655    let agent_cfg: Option<AgentBehaviorConfig> =
656        state.agent_config.resolve(&session.agent_id).await;
657
658    // SEAM 2/3 — resolve the system prompt in priority order:
659    //   1. the per-AGENT instructions (+ personality), when set,
660    //   2. the per-ORG persona override ([`AgentSettings::persona`]),
661    //   3. the host's installed default persona ([`AppState::default_persona`]).
662    // All absent ⇒ `None`, so the runner stays on its const customer-support
663    // prompt and behavior is byte-for-byte unchanged.
664    let system_prompt = agent_cfg
665        .as_ref()
666        .and_then(AgentBehaviorConfig::system_prompt)
667        .or_else(|| state.settings.get(&org_id).persona)
668        .or_else(|| state.default_persona.clone());
669
670    // The agent's first-turn greeting section (the runner injects it only when
671    // the conversation has no prior messages) + its tool allow-list (`None` ⇒ the
672    // full server tool set).
673    let greeting_section = agent_cfg
674        .as_ref()
675        .and_then(AgentBehaviorConfig::greeting_section);
676    let enabled_tools = agent_cfg
677        .as_ref()
678        .and_then(AgentBehaviorConfig::enabled_tool_ids);
679
680    // Per-tool config delivered to host tools at execution + the authLevel gate.
681    let tool_configs = agent_cfg
682        .as_ref()
683        .map(AgentBehaviorConfig::tool_configs)
684        .filter(|m| !m.is_empty());
685    let auth_gate = agent_cfg
686        .as_ref()
687        .and_then(|cfg| build_auth_gate(state, cfg));
688
689    // The agent's conversation workflow (if any) + the step this session is on.
690    let workflow = agent_cfg
691        .as_ref()
692        .and_then(|c| c.conversation_workflow.clone())
693        .map(|wf| runner::WorkflowTurn {
694            workflow: wf,
695            current_step_id: state.session_current_step(session_id),
696        });
697
698    // The judge LLM surface — only built when there's a workflow to advance. A
699    // test-injected chat provider (the mock) doubles as the judge offline; in
700    // production the judge runs on the server's default (cheap) model with the
701    // turn's resolved gateway key, independent of any per-turn model override so
702    // the yes/no/maybe decision stays cheap.
703    let judge: Option<Arc<dyn LlmProvider>> = if workflow.is_some() {
704        Some(build_judge_provider(state, &llm))
705    } else {
706        None
707    };
708
709    // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
710    let tool_provider = state.tool_provider.clone();
711    let session_id_owned = session_id.to_string();
712
713    let state_for_turn = state.clone();
714    // Carry the turn's org on the AccessContext so a multi-tenant host adapter's
715    // `knowledge_for_access` can scope RAG to this tenant. The authed-principal
716    // path already stamps its own org (`Principal::access_context`); a widget /
717    // anonymous connection does not, so fall back to the session's persisted org
718    // (every session carries `organization_id` since the create-session path
719    // derives it). The operator's built-in single-tenant ACL ignores the org, so
720    // this is behavior-preserving for the reference flavor.
721    let access_owned = if access.organization_id.is_some() {
722        access.clone()
723    } else {
724        access
725            .clone()
726            .with_organization_id(session.organization_id.clone())
727    };
728    let sink_owned = sink.clone();
729    let request_id_owned = request_id.to_string();
730    let conversation_id = session.conversation_id.clone();
731
732    tokio::spawn(async move {
733        let result = runner::run_streaming_turn(
734            TurnRequest {
735                storage: state_for_turn.storage.clone(),
736                llm,
737                max_iterations: state_for_turn.config.max_iterations,
738                conversation_id: &conversation_id,
739                request_id: &request_id_owned,
740                user_message: &message,
741                // The connection's resolved document-level entitlement: retrieval is
742                // filtered to what this requester may read (org-public only when the
743                // connection is anonymous).
744                access: access_owned,
745                // Production: `None` (a live client is built from `llm`). Tests: the
746                // scenario corpus's `MockLlmClient`, which runs the turn offline.
747                llm_provider: chat_provider,
748                // Opt-in rerank stage (feature gap G8): `None` unless the operator
749                // enabled it via `SMOOTH_AGENT_RERANK` (gateway/lexical). Default-off
750                // keeps retrieval behavior unchanged.
751                reranker: crate::reranker::build_reranker(
752                    &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
753                ),
754                confirmation,
755                // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
756                tool_provider,
757                // SEAM 2 — resolved per-org persona (None ⇒ const prompt).
758                system_prompt,
759                org_id: Some(org_id),
760                // The per-org key resolved above, threaded so a host tool
761                // provider's retrieval tools call the same gateway this turn used.
762                gateway_key: turn_gateway_key,
763                // SEAM 3 — per-agent conversation workflow + its cheap judge. Both
764                // `None` for a freeform agent, so the turn is unchanged.
765                workflow,
766                judge,
767                // SEAM 3 — per-agent first-turn greeting + tool allow-list.
768                greeting_section,
769                enabled_tools,
770                // SEAM 3 — authLevel gate + per-tool config delivery.
771                auth_gate,
772                tool_configs,
773            },
774            &sink_owned,
775        )
776        .await;
777
778        match result {
779            Ok(turn) => {
780                // Persist the workflow step pointer the judge landed on, so the
781                // next turn resumes on the right step. No-op when the agent has no
782                // workflow (`next_step_id` is `None`).
783                if let Some(step) = turn.next_step_id.as_deref() {
784                    state_for_turn.set_session_current_step(&session_id_owned, Some(step));
785                }
786                let response = runner::general_agent_response(&turn.reply);
787                let _ = sink_owned.send(protocol::eventual_response(
788                    &request_id_owned,
789                    200,
790                    &turn.message_id,
791                    response,
792                    false,
793                    &turn.citations,
794                    turn.usage,
795                ));
796            }
797            Err(e) => {
798                let _ = sink_owned.send(protocol::error(
799                    Some(&request_id_owned),
800                    "AGENT_ERROR",
801                    &format!("agent turn failed: {e}"),
802                ));
803            }
804        }
805    });
806}
807
808/// `confirm_tool_action` — resume a turn parked on a write-tool confirmation.
809///
810/// Per `spec/actions/confirm-tool-action.schema.json` the client sends
811/// `{ action, sessionId, requestId, approved }` in reply to a
812/// `write_confirmation_required` event. We look up the session's registered
813/// [`HumanResponse`](smooth_operator_core::HumanResponse) sender (set by the
814/// runner's confirmation bridge when the turn parked), take it, and feed it the
815/// verdict: `approved` → `Approved` (the parked tool executes), else `Denied`
816/// (the tool is skipped with a rejection result the model sees). There is no
817/// dedicated response event — the resumed workflow signals continuation via its
818/// normal streaming sequence (`stream_chunk`/`stream_token` → `eventual_response`);
819/// we additionally ack with an `immediate_response`. Taking the sender makes a
820/// duplicate confirm a no-op (`NO_PENDING_CONFIRMATION`).
821fn handle_confirm_tool_action(
822    state: &AppState,
823    parsed: &Value,
824    request_id: Option<&str>,
825    sink: &UnboundedSender<Value>,
826) {
827    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
828        let _ = sink.send(protocol::error(
829            request_id,
830            "VALIDATION_ERROR",
831            "confirm_tool_action requires a 'sessionId'",
832        ));
833        return;
834    };
835
836    // `approved` is required and must be a boolean — a missing/garbled verdict
837    // must NOT silently approve a write. Fail closed on a bad shape.
838    let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
839        let _ = sink.send(protocol::error(
840            request_id,
841            "VALIDATION_ERROR",
842            "confirm_tool_action requires a boolean 'approved'",
843        ));
844        return;
845    };
846
847    let Some(responder) = state.take_confirmation(session_id) else {
848        let _ = sink.send(protocol::error(
849            request_id,
850            "NO_PENDING_CONFIRMATION",
851            &format!("no tool action is awaiting confirmation for session '{session_id}'"),
852        ));
853        return;
854    };
855
856    let verdict = if approved {
857        smooth_operator_core::HumanResponse::Approved
858    } else {
859        smooth_operator_core::HumanResponse::Denied {
860            reason: "user rejected the action".to_string(),
861        }
862    };
863
864    if responder.send(verdict).is_err() {
865        // The parked turn ended (timeout / disconnect) before the confirm landed.
866        let _ = sink.send(protocol::error(
867            request_id,
868            "NO_PENDING_CONFIRMATION",
869            &format!(
870                "the turn awaiting confirmation for session '{session_id}' is no longer active"
871            ),
872        ));
873        return;
874    }
875
876    // Ack the confirmation; the resumed turn streams its own follow-on events.
877    let _ = sink.send(protocol::immediate_response(
878        request_id,
879        200,
880        if approved {
881            "Tool action approved"
882        } else {
883            "Tool action rejected"
884        },
885        json!({ "sessionId": session_id, "approved": approved }),
886    ));
887}
888
889/// Apply an optional per-turn `model` override (from a `send_message` body) to a
890/// resolved [`LlmConfig`]. When the body carries a non-empty `model` string, this
891/// turn runs on that gateway model id (a Smooth Modes / `/smooth-mode` preset),
892/// overriding the server's configured default; an absent, non-string, or
893/// blank/whitespace-only `model` leaves the config's default model unchanged
894/// (byte-for-byte the prior behavior). Every other field (url, key, limits)
895/// stays as resolved — only the model id changes.
896fn apply_model_override(mut llm: LlmConfig, body: &Value) -> LlmConfig {
897    if let Some(model) = body.get("model").and_then(Value::as_str) {
898        let model = model.trim();
899        if !model.is_empty() {
900            llm.model = model.to_string();
901        }
902    }
903    llm
904}
905
906/// Cap the judge's output: a `yes` / `no` / `maybe` verdict needs only a few
907/// tokens. Small so the extra per-turn cost + latency stay negligible.
908const JUDGE_MAX_TOKENS: u32 = 16;
909
910/// Build the per-agent authLevel gate, or `None` when it would be inert.
911///
912/// The set of tools that "support auth requirements" (the operator analog of the
913/// TS `supportsAuthRequirement` flag) comes from `SMOOTH_AGENT_AUTH_TOOLS`
914/// (comma-separated); empty (the default) ⇒ nothing is gated. `session_authenticated`
915/// is fail-closed (`false`) — an OTP / identity-verification flow is host wiring
916/// behind this seam, not implemented by the reference server.
917fn build_auth_gate(state: &AppState, cfg: &AgentBehaviorConfig) -> Option<AuthGateHook> {
918    let supporting: std::collections::HashSet<String> = std::env::var("SMOOTH_AGENT_AUTH_TOOLS")
919        .ok()
920        .into_iter()
921        .flat_map(|s| {
922            s.split(',')
923                .map(str::trim)
924                .filter(|s| !s.is_empty())
925                .map(str::to_string)
926                .collect::<Vec<_>>()
927        })
928        .collect();
929    if supporting.is_empty() {
930        let _ = state; // no host-declared auth-supporting tools → gate is inert
931        return None;
932    }
933    let levels = cfg
934        .enabled_tools
935        .iter()
936        .map(|t| (t.tool_id.clone(), AuthLevel::parse(&t.auth_level)))
937        .collect();
938    let hook = AuthGateHook::new(levels, cfg.visibility, false, supporting);
939    hook.is_active().then_some(hook)
940}
941
942/// Build the workflow judge's LLM surface. Prefers a test-injected chat provider
943/// (the scenario mock — runs offline); otherwise builds a live client on the
944/// server's **default** (cheap) model with the turn's resolved gateway url/key,
945/// independent of any per-turn model override, so the judge stays cheap even when
946/// the turn itself runs on a bigger model.
947fn build_judge_provider(state: &AppState, turn_llm: &LlmConfig) -> Arc<dyn LlmProvider> {
948    if let Some(mock) = state.chat_provider.clone() {
949        return mock;
950    }
951    let mut cfg = turn_llm.clone();
952    cfg.model = state.config.judge_model.clone();
953    cfg.max_tokens = JUDGE_MAX_TOKENS;
954    Arc::new(LlmClient::new(cfg))
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960    use smooth_operator_core::llm::{ApiFormat, RetryPolicy};
961
962    /// A baseline config whose `model` is the server default, so each override
963    /// test asserts against a known starting model.
964    fn base_llm() -> LlmConfig {
965        LlmConfig {
966            api_url: "https://llm.smoo.ai/v1".to_string(),
967            api_key: "sk-test".to_string(),
968            model: "claude-haiku-4-5".to_string(),
969            max_tokens: 512,
970            temperature: 0.0,
971            retry_policy: RetryPolicy::default(),
972            api_format: ApiFormat::OpenAiCompat,
973        }
974    }
975
976    #[test]
977    fn model_override_present_replaces_model() {
978        let body = json!({ "action": "send_message", "model": "claude-opus-4-8" });
979        let llm = apply_model_override(base_llm(), &body);
980        assert_eq!(llm.model, "claude-opus-4-8");
981        // Only the model id changes — every other field is preserved.
982        assert_eq!(llm.api_url, "https://llm.smoo.ai/v1");
983        assert_eq!(llm.api_key, "sk-test");
984        assert_eq!(llm.max_tokens, 512);
985    }
986
987    #[test]
988    fn model_override_absent_keeps_default() {
989        let body = json!({ "action": "send_message", "message": "hi" });
990        let llm = apply_model_override(base_llm(), &body);
991        assert_eq!(llm.model, "claude-haiku-4-5");
992    }
993
994    #[test]
995    fn model_override_blank_or_non_string_keeps_default() {
996        // Whitespace-only is treated as absent.
997        let blank = json!({ "model": "   " });
998        assert_eq!(
999            apply_model_override(base_llm(), &blank).model,
1000            "claude-haiku-4-5"
1001        );
1002        // A non-string `model` is ignored (no panic, default kept).
1003        let wrong_type = json!({ "model": 42 });
1004        assert_eq!(
1005            apply_model_override(base_llm(), &wrong_type).model,
1006            "claude-haiku-4-5"
1007        );
1008    }
1009}