Skip to main content

smooth_operator_server/
handler.rs

1//! Action dispatch — parses a client action envelope and produces the matching
2//! server events.
3//!
4//! Each handler is `async` and forwards events through an
5//! `UnboundedSender<serde_json::Value>` (the per-connection outbound sink). The
6//! socket task drains the sink and writes each value as a JSON WS text frame, so
7//! streaming actions (`send_message`) can emit many events while still being
8//! driven from one place.
9
10use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::agent_config::{AgentBehaviorConfig, AuthGateHook, AuthLevel};
17use smooth_operator::domain::{
18    Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
19};
20use smooth_operator_core::llm_provider::LlmProvider;
21use smooth_operator_core::{LlmClient, LlmConfig};
22
23use crate::protocol;
24use crate::runner;
25use crate::runner::TurnRequest;
26use crate::state::AppState;
27
28/// The agent's display name for the reference server.
29const AGENT_NAME: &str = "smooth-agent";
30
31/// Parse and dispatch a single inbound text frame. Any produced events are sent
32/// through `sink`. Returns `Ok(())` always — protocol-level failures are
33/// surfaced as `error` events, never as hard errors that drop the connection.
34pub async fn handle_frame(
35    state: &AppState,
36    access: &AccessContext,
37    conn_id: &str,
38    origin: Option<&str>,
39    auth_org: Option<&str>,
40    raw: &str,
41    sink: &UnboundedSender<Value>,
42) {
43    let parsed: Value = match serde_json::from_str(raw) {
44        Ok(v) => v,
45        Err(e) => {
46            let _ = sink.send(protocol::error(
47                None,
48                "VALIDATION_ERROR",
49                &format!("invalid JSON frame: {e}"),
50            ));
51            return;
52        }
53    };
54
55    let action = parsed.get("action").and_then(Value::as_str);
56    let request_id = parsed.get("requestId").and_then(Value::as_str);
57
58    match action {
59        Some("ping") => {
60            let _ = sink.send(protocol::pong(request_id));
61        }
62        Some("create_conversation_session") => {
63            handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
64                .await;
65        }
66        Some("get_session") => {
67            handle_get_session(state, &parsed, request_id, sink);
68        }
69        Some("get_conversation_messages") => {
70            handle_get_conversation_messages(state, &parsed, request_id, sink).await;
71        }
72        Some("send_message") => {
73            handle_send_message(state, access, &parsed, request_id, sink).await;
74        }
75        Some("confirm_tool_action") => {
76            handle_confirm_tool_action(state, &parsed, request_id, sink);
77        }
78        Some("verify_otp") => {
79            handle_verify_otp(state, &parsed, request_id, sink).await;
80        }
81        Some(other) => {
82            let _ = sink.send(protocol::error(
83                request_id,
84                "UNSUPPORTED_ACTION",
85                &format!("action '{other}' is not supported by this server"),
86            ));
87        }
88        None => {
89            let _ = sink.send(protocol::error(
90                request_id,
91                "VALIDATION_ERROR",
92                "missing 'action' field",
93            ));
94        }
95    }
96}
97
98/// Outcome of widget-auth enforcement: whether to proceed, and (when an agent
99/// policy resolved) the org that policy attributes the agent to.
100enum WidgetAuthOutcome {
101    /// Auth denied — an `error` event was already emitted; the caller must stop.
102    Denied,
103    /// Auth passed. `org_id` is `Some` when the resolved policy carried an
104    /// `organization_id` (a multi-tenant host that knows the agent's org), else
105    /// `None` (no policy, or a policy without an org — org derivation falls
106    /// through to the JWT principal, then the seed org).
107    Allowed { org_id: Option<String> },
108}
109
110/// Enforce an agent's embeddable-widget policy (origin allowlist + `authContext`)
111/// before a session is created. Returns [`WidgetAuthOutcome::Allowed`] to proceed
112/// (carrying the policy's org when known), or [`WidgetAuthOutcome::Denied`] after
113/// emitting a protocol `error` (the caller must then stop). Agents with no policy
114/// proceed — unless `WIDGET_AUTH_STRICT` is set, in which case an unknown agent is
115/// rejected (fail closed).
116async fn enforce_widget_auth(
117    state: &AppState,
118    origin: Option<&str>,
119    agent_id: &str,
120    parsed: &Value,
121    request_id: Option<&str>,
122    sink: &UnboundedSender<Value>,
123) -> WidgetAuthOutcome {
124    let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
125        if state.config.widget_auth_strict {
126            let _ = sink.send(protocol::error(
127                request_id,
128                "AGENT_NOT_AUTHORIZED",
129                "this agent is not registered for embedding",
130            ));
131            return WidgetAuthOutcome::Denied;
132        }
133        return WidgetAuthOutcome::Allowed { org_id: None };
134    };
135
136    // Origin allowlist — fail closed: a missing or disallowed `Origin` is rejected.
137    if !smooth_operator::widget_auth::origin_allowed(
138        &policy.allowed_origins,
139        origin.unwrap_or_default(),
140    ) {
141        let _ = sink.send(protocol::error(
142            request_id,
143            "ORIGIN_NOT_ALLOWED",
144            "this origin is not allowed to embed this agent",
145        ));
146        return WidgetAuthOutcome::Denied;
147    }
148
149    // Pre-auth `authContext` (optional): when present it must verify.
150    if let Some(ac) = parsed.get("authContext") {
151        if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
152            let _ = sink.send(protocol::error(
153                request_id,
154                "AUTH_CONTEXT_INVALID",
155                "authContext signature failed verification",
156            ));
157            return WidgetAuthOutcome::Denied;
158        }
159    }
160    WidgetAuthOutcome::Allowed {
161        org_id: policy.organization_id,
162    }
163}
164
165/// Verify a JSON `authContext` (`{userId, signature, timestamp}`) against the
166/// agent's `public_key`. False on any missing field/key or signature/replay
167/// failure. Replay window: 60s.
168fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
169    let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
170        public_key,
171        ac.get("userId").and_then(Value::as_str),
172        ac.get("signature").and_then(Value::as_str),
173        ac.get("timestamp").and_then(Value::as_i64),
174    ) else {
175        return false;
176    };
177    let now = chrono::Utc::now().timestamp();
178    smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
179}
180
181/// `create_conversation_session` — create a conversation + user & agent
182/// participants + a session, then reply with an `immediate_response` carrying
183/// the session descriptor (per `create-conversation-session.schema.json`).
184async fn handle_create_session(
185    state: &AppState,
186    conn_id: &str,
187    origin: Option<&str>,
188    auth_org: Option<&str>,
189    parsed: &Value,
190    request_id: Option<&str>,
191    sink: &UnboundedSender<Value>,
192) {
193    let agent_id = parsed
194        .get("agentId")
195        .and_then(Value::as_str)
196        .map(str::to_string)
197        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
198
199    // Embeddable-widget auth: enforce the agent's origin allowlist + authContext
200    // before creating any session. No-op for agents without a policy (unless
201    // WIDGET_AUTH_STRICT). On denial, an error is emitted and we stop here. A
202    // resolved policy may also carry the agent's org (multi-tenant host).
203    let widget_org =
204        match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
205            WidgetAuthOutcome::Denied => return,
206            WidgetAuthOutcome::Allowed { org_id } => org_id,
207        };
208
209    let user_name = parsed
210        .get("userName")
211        .and_then(Value::as_str)
212        .unwrap_or("Visitor")
213        .to_string();
214    let user_email = parsed
215        .get("userEmail")
216        .and_then(Value::as_str)
217        .map(str::to_string);
218    let browser_fingerprint = parsed
219        .get("browserFingerprint")
220        .and_then(Value::as_str)
221        .map(str::to_string);
222
223    let now = chrono::Utc::now();
224    // Derive the org this session (conversation + participants) belongs to, in
225    // priority order:
226    //   1. the widget policy's `organization_id` — a multi-tenant host that knows
227    //      the agent's org (widget visitors authenticate via origin/authContext,
228    //      not a JWT, so their org rides on the agent's policy);
229    //   2. the connection's authenticated JWT principal org (`auth_org`) — a
230    //      dashboard user / authed client;
231    //   3. the server's seed org — the single-org reference/dev case, so the
232    //      admin API's org-scoping (document sets, indexing runs) still lines up
233    //      with the seeded knowledge. This keeps the no-auth/local flavor
234    //      behavior unchanged.
235    let org_id = widget_org
236        .or_else(|| auth_org.map(str::to_string))
237        .unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
238
239    let conversation_id = uuid::Uuid::new_v4().to_string();
240    let session_id = uuid::Uuid::new_v4().to_string();
241    let user_participant_id = uuid::Uuid::new_v4().to_string();
242    let agent_participant_id = uuid::Uuid::new_v4().to_string();
243
244    // Associate this connection with its session (and agent) on the backplane so
245    // events published to the session/agent — by an agent turn or any other
246    // service — reach this client's socket, on this pod or (with a Redis/NATS
247    // backplane) any pod.
248    state
249        .backplane
250        .associate(
251            conn_id,
252            smooth_operator::backplane::Target::Session(session_id.clone()),
253        )
254        .await;
255    state
256        .backplane
257        .associate(
258            conn_id,
259            smooth_operator::backplane::Target::Agent(agent_id.clone()),
260        )
261        .await;
262
263    let conversation = Conversation {
264        id: conversation_id.clone(),
265        platform: Platform::Web,
266        name: format!("Session {session_id}"),
267        organization_id: org_id.clone(),
268        idempotency_key: session_id.clone(),
269        metadata_json: parsed.get("metadata").cloned(),
270        analytics_json: None,
271        created_at: now,
272        updated_at: now,
273    };
274
275    let user_participant = Participant {
276        id: user_participant_id.clone(),
277        conversation_id: conversation_id.clone(),
278        organization_id: org_id.clone(),
279        participant_type: ParticipantType::User,
280        external_id: None,
281        internal_id: None,
282        browser_fingerprint,
283        browser_info: None,
284        name: user_name,
285        email: user_email.clone(),
286        phone: None,
287        crm_contact_id: None,
288        metadata_json: None,
289        created_at: now,
290        updated_at: now,
291    };
292
293    let agent_participant = Participant {
294        id: agent_participant_id.clone(),
295        conversation_id: conversation_id.clone(),
296        organization_id: org_id.clone(),
297        participant_type: ParticipantType::AiAgent,
298        external_id: None,
299        internal_id: Some(agent_id.clone()),
300        browser_fingerprint: None,
301        browser_info: None,
302        name: AGENT_NAME.to_string(),
303        email: None,
304        phone: None,
305        crm_contact_id: None,
306        metadata_json: None,
307        created_at: now,
308        updated_at: now,
309    };
310
311    // Stash the caller's OTP contact on the session so the end_user auth-gate
312    // flow can offer verification without a storage roundtrip (mirrors how the
313    // workflow step pointer lives in session metadata). The reference create path
314    // captures only an email; a host that also captures a phone would add
315    // `contactPhone` here for an SMS channel.
316    let session_metadata = user_email.as_ref().map(|email| {
317        let mut meta = std::collections::HashMap::new();
318        meta.insert("contactEmail".to_string(), Value::from(email.clone()));
319        meta
320    });
321
322    let session = Session {
323        session_id: session_id.clone(),
324        conversation_id: conversation_id.clone(),
325        organization_id: org_id.clone(),
326        agent_id: agent_id.clone(),
327        agent_name: AGENT_NAME.to_string(),
328        user_participant_id: user_participant_id.clone(),
329        agent_participant_id: agent_participant_id.clone(),
330        // The thread id is the conversation id: per-session memory is carried by
331        // replaying this conversation's persisted message log (see runner.rs).
332        thread_id: conversation_id.clone(),
333        status: Some(SessionStatus::Active),
334        token_count: Some(0),
335        message_count: Some(0),
336        metadata: session_metadata,
337        created_at: Some(now),
338        updated_at: Some(now),
339        ended_at: None,
340        last_activity_at: Some(now),
341    };
342
343    // Persist to the storage adapter (best-effort: a failure surfaces as error).
344    let storage = state.storage.clone();
345    let sink_clone = sink.clone();
346    let request_id_owned = request_id.map(str::to_string);
347    let session_for_registry = session.clone();
348    let state_clone = state.clone();
349
350    let data = json!({
351        "sessionId": session_id,
352        "conversationId": conversation_id,
353        "agentId": agent_id,
354        "agentName": AGENT_NAME,
355        "userParticipantId": user_participant_id,
356        "agentParticipantId": agent_participant_id,
357    });
358
359    tokio::spawn(async move {
360        let rid = request_id_owned.as_deref();
361        if let Err(e) = storage.create_conversation(conversation).await {
362            let _ = sink_clone.send(protocol::error(
363                rid,
364                "INTERNAL_ERROR",
365                &format!("create conversation failed: {e}"),
366            ));
367            return;
368        }
369        if let Err(e) = storage.add_participant(user_participant).await {
370            let _ = sink_clone.send(protocol::error(
371                rid,
372                "INTERNAL_ERROR",
373                &format!("add user participant failed: {e}"),
374            ));
375            return;
376        }
377        if let Err(e) = storage.add_participant(agent_participant).await {
378            let _ = sink_clone.send(protocol::error(
379                rid,
380                "INTERNAL_ERROR",
381                &format!("add agent participant failed: {e}"),
382            ));
383            return;
384        }
385        if let Err(e) = storage.create_session(session).await {
386            let _ = sink_clone.send(protocol::error(
387                rid,
388                "INTERNAL_ERROR",
389                &format!("create session failed: {e}"),
390            ));
391            return;
392        }
393        state_clone.insert_session(session_for_registry);
394        let _ = sink_clone.send(protocol::immediate_response(
395            rid,
396            200,
397            "Session created",
398            data,
399        ));
400    });
401}
402
403/// `get_session` — return the session snapshot (per `get-session.schema.json`).
404fn handle_get_session(
405    state: &AppState,
406    parsed: &Value,
407    request_id: Option<&str>,
408    sink: &UnboundedSender<Value>,
409) {
410    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
411        let _ = sink.send(protocol::error(
412            request_id,
413            "VALIDATION_ERROR",
414            "missing 'sessionId'",
415        ));
416        return;
417    };
418
419    match state.get_session(session_id) {
420        Some(s) => {
421            let data = json!({
422                "sessionId": s.session_id,
423                "conversationId": s.conversation_id,
424                "agentId": s.agent_id,
425                "agentName": s.agent_name,
426                "userParticipantId": s.user_participant_id,
427                "agentParticipantId": s.agent_participant_id,
428                "threadId": s.thread_id,
429                "status": s.status.map_or("active", |st| match st {
430                    SessionStatus::Active => "active",
431                    SessionStatus::Idle => "idle",
432                    SessionStatus::Ended => "ended",
433                }),
434            });
435            let _ = sink.send(protocol::immediate_response(
436                request_id, 200, "Session", data,
437            ));
438        }
439        None => {
440            let _ = sink.send(protocol::error(
441                request_id,
442                "SESSION_NOT_FOUND",
443                &format!("session '{session_id}' not found"),
444            ));
445        }
446    }
447}
448
449/// `get_conversation_messages` — paginated message history for a session's
450/// conversation. Wraps the storage adapter's `list_messages_by_conversation`
451/// (the same call the admin API + the turn runner use) and replies with an
452/// `immediate_response` carrying `{ conversationId, messages, nextCursor, hasMore }`.
453///
454/// Optional inputs: `limit` (default 50) and an opaque `cursor` from a prior
455/// page's `nextCursor`. Newest-first (the common "recent history" read).
456async fn handle_get_conversation_messages(
457    state: &AppState,
458    parsed: &Value,
459    request_id: Option<&str>,
460    sink: &UnboundedSender<Value>,
461) {
462    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
463        let _ = sink.send(protocol::error(
464            request_id,
465            "VALIDATION_ERROR",
466            "missing 'sessionId'",
467        ));
468        return;
469    };
470    let Some(session) = state.get_session(session_id) else {
471        let _ = sink.send(protocol::error(
472            request_id,
473            "SESSION_NOT_FOUND",
474            &format!("session '{session_id}' not found"),
475        ));
476        return;
477    };
478
479    const DEFAULT_LIMIT: usize = 50;
480    let limit = parsed
481        .get("limit")
482        .and_then(Value::as_u64)
483        .map(|n| n as usize)
484        .filter(|n| *n > 0)
485        .unwrap_or(DEFAULT_LIMIT);
486    let cursor = parsed
487        .get("cursor")
488        .and_then(Value::as_str)
489        .map(str::to_string);
490
491    let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
492    query.cursor = cursor;
493    query.descending = true;
494
495    match state.storage.list_messages_by_conversation(query).await {
496        Ok(page) => {
497            let data = json!({
498                "conversationId": session.conversation_id,
499                "messages": page.messages,
500                "nextCursor": page.next_cursor,
501                "hasMore": page.next_cursor.is_some(),
502            });
503            let _ = sink.send(protocol::immediate_response(
504                request_id,
505                200,
506                "ConversationMessages",
507                data,
508            ));
509        }
510        Err(e) => {
511            let _ = sink.send(protocol::error(
512                request_id,
513                "STORAGE_ERROR",
514                &format!("failed to list messages: {e}"),
515            ));
516        }
517    }
518}
519
520/// `send_message` — ack with `immediate_response` (202), run a streaming
521/// knowledge-grounded turn, emit `stream_token` / `stream_chunk` as it goes, and
522/// finish with `eventual_response` (200). Errors (no gateway key, unknown
523/// session, agent failure) surface as clean `error` events.
524async fn handle_send_message(
525    state: &AppState,
526    access: &AccessContext,
527    parsed: &Value,
528    request_id: Option<&str>,
529    sink: &UnboundedSender<Value>,
530) {
531    // requestId is load-bearing for streaming correlation; require it.
532    let Some(request_id) = request_id else {
533        let _ = sink.send(protocol::error(
534            None,
535            "VALIDATION_ERROR",
536            "send_message requires a 'requestId'",
537        ));
538        return;
539    };
540
541    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
542        let _ = sink.send(protocol::error(
543            Some(request_id),
544            "VALIDATION_ERROR",
545            "missing 'sessionId'",
546        ));
547        return;
548    };
549
550    let message = match parsed.get("message").and_then(Value::as_str) {
551        Some(m) if !m.trim().is_empty() => m.to_string(),
552        _ => {
553            let _ = sink.send(protocol::error(
554                Some(request_id),
555                "VALIDATION_ERROR",
556                "missing or empty 'message'",
557            ));
558            return;
559        }
560    };
561
562    let Some(session) = state.get_session(session_id) else {
563        let _ = sink.send(protocol::error(
564            Some(request_id),
565            "SESSION_NOT_FOUND",
566            &format!("session '{session_id}' not found"),
567        ));
568        return;
569    };
570
571    // A test-injected provider (the scenario-parity corpus's `MockLlmClient`)
572    // overrides the live gateway client entirely — the turn never touches the
573    // network — so it does NOT need a configured gateway key. Production leaves
574    // `chat_provider` `None`, so this clone is `None` and the key gate below is
575    // unchanged.
576    let chat_provider = state.chat_provider.clone();
577
578    // Resolve the gateway key for this turn's org. The conversation carries the
579    // org; the resolver maps it to a per-org key (e.g. a LiteLLM virtual key per
580    // tenant) so a multi-tenant flavor bills/scopes each org separately. The
581    // default `EnvGatewayKeyResolver` returns the single env key for every org,
582    // so the local/default flavor is unchanged. On `None` (no per-org key) we
583    // fall back to the env key; only when neither supplies a key do we error.
584    let org_id = match state
585        .storage
586        .get_conversation(&session.conversation_id)
587        .await
588    {
589        Ok(Some(conversation)) => conversation.organization_id,
590        // No conversation row (shouldn't happen for a live session) → resolve as
591        // if anonymous; the env fallback still applies.
592        Ok(None) | Err(_) => String::new(),
593    };
594    let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
595        &state.gateway_key_resolver,
596        &org_id,
597        state.config.gateway_key.as_deref(),
598    )
599    .await;
600
601    // No resolvable key → can't run a *live* LLM turn. Return a clean error (the
602    // server stays usable for protocol-only checks). When a mock provider is
603    // injected we fall back to a placeholder config — the mock replaces the
604    // client built from it, so its url/key/model are never used.
605    // Keep a copy of the resolved key to thread into the turn's
606    // `ToolProviderContext` (a retrieval-style host tool calls the same gateway);
607    // `None` on the mock/placeholder path so a host tool can fall back.
608    let turn_gateway_key = resolved_key.clone();
609    let llm = match resolved_key {
610        Some(key) => state.config.llm_config_with_key(key),
611        None if chat_provider.is_some() => state.config.placeholder_llm_config(),
612        None => {
613            let _ = sink.send(protocol::error(
614                Some(request_id),
615                "LLM_UNAVAILABLE",
616                "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
617                 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
618                 key to enable send_message.",
619            ));
620            return;
621        }
622    };
623
624    // Per-turn model override (Smooth Modes / `/smooth-mode` preset): when the
625    // send_message body carries a non-empty `model`, run THIS turn on it,
626    // overriding the server's configured default model. Absent or blank ⇒ the
627    // config default is kept, so behavior is unchanged when the field is unused.
628    let llm = apply_model_override(llm, parsed);
629
630    // Ack: processing started.
631    let _ = sink.send(protocol::immediate_response(
632        Some(request_id),
633        202,
634        "Processing your request...",
635        json!({}),
636    ));
637
638    // Run the turn in a spawned task, NOT inline. A turn that calls a
639    // confirmation-gated write tool **parks** awaiting a later
640    // `confirm_tool_action` frame; the socket reader dispatches that frame on the
641    // same connection, so blocking the reader here would deadlock (the confirm
642    // can never be read). Spawning frees the reader to receive the confirmation
643    // while the turn streams its events through the (cloned) sink. Pearl: HITL
644    // pause/resume.
645    let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
646        crate::runner::ConfirmationConfig {
647            tool_patterns: patterns,
648            session_id: session.session_id.clone(),
649            register: {
650                let state = state.clone();
651                Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
652            },
653            clear: {
654                let state = state.clone();
655                Arc::new(move |sid: &str| state.clear_confirmation(sid))
656            },
657        }
658    });
659
660    // The reference server is single-org; a multi-tenant host derives this from
661    // auth. Used to (a) resolve the org's persona override (SEAM 2) and (b)
662    // scope the host's tool provider (SEAM 1).
663    let org_id = crate::server::SEED_ORG_ID.to_string();
664
665    // SEAM 3 — per-agent behavior config (instructions + conversation workflow),
666    // resolved by the connection's `agent_id` so two agents in the same org can
667    // behave differently. Absent / malformed ⇒ `None`, so the org-default persona
668    // (SEAM 2) is used, unchanged. Isolated per agent by construction.
669    let agent_cfg: Option<AgentBehaviorConfig> =
670        state.agent_config.resolve(&session.agent_id).await;
671
672    // SEAM 2/3 — resolve the system prompt in priority order:
673    //   1. the per-AGENT instructions (+ personality), when set,
674    //   2. the per-ORG persona override ([`AgentSettings::persona`]),
675    //   3. the host's installed default persona ([`AppState::default_persona`]).
676    // All absent ⇒ `None`, so the runner stays on its const customer-support
677    // prompt and behavior is byte-for-byte unchanged.
678    let system_prompt = agent_cfg
679        .as_ref()
680        .and_then(AgentBehaviorConfig::system_prompt)
681        .or_else(|| state.settings.get(&org_id).persona)
682        .or_else(|| state.default_persona.clone());
683
684    // The agent's first-turn greeting section (the runner injects it only when
685    // the conversation has no prior messages) + its tool allow-list (`None` ⇒ the
686    // full server tool set).
687    let greeting_section = agent_cfg
688        .as_ref()
689        .and_then(AgentBehaviorConfig::greeting_section);
690    let enabled_tools = agent_cfg
691        .as_ref()
692        .and_then(AgentBehaviorConfig::enabled_tool_ids);
693
694    // Per-tool config delivered to host tools at execution + the authLevel gate.
695    let tool_configs = agent_cfg
696        .as_ref()
697        .map(AgentBehaviorConfig::tool_configs)
698        .filter(|m| !m.is_empty());
699    // The session's identity-verified bit (set by a prior successful verify_otp)
700    // is threaded into the gate so a verified caller's `end_user` tools run.
701    let session_authed = state.session_authenticated(session_id);
702    let auth_gate = agent_cfg
703        .as_ref()
704        .and_then(|cfg| build_auth_gate(state, cfg, session_authed));
705    // Keep a handle to the gate's OTP-refusal flag so, after the turn, we can see
706    // whether an `end_user` tool was refused for lack of verification and (with an
707    // OtpService installed + a known contact) offer the OTP flow. `None` when
708    // there's no gate — the OTP flow can't trigger.
709    let otp_gate = auth_gate.clone();
710
711    // The agent's conversation workflow (if any) + the step this session is on.
712    let workflow = agent_cfg
713        .as_ref()
714        .and_then(|c| c.conversation_workflow.clone())
715        .map(|wf| runner::WorkflowTurn {
716            workflow: wf,
717            current_step_id: state.session_current_step(session_id),
718        });
719
720    // The judge LLM surface — only built when there's a workflow to advance. A
721    // test-injected chat provider (the mock) doubles as the judge offline; in
722    // production the judge runs on the server's default (cheap) model with the
723    // turn's resolved gateway key, independent of any per-turn model override so
724    // the yes/no/maybe decision stays cheap.
725    let judge: Option<Arc<dyn LlmProvider>> = if workflow.is_some() {
726        Some(build_judge_provider(state, &llm))
727    } else {
728        None
729    };
730
731    // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
732    let tool_provider = state.tool_provider.clone();
733    let session_id_owned = session_id.to_string();
734
735    let state_for_turn = state.clone();
736    // Carry the turn's org on the AccessContext so a multi-tenant host adapter's
737    // `knowledge_for_access` can scope RAG to this tenant. The authed-principal
738    // path already stamps its own org (`Principal::access_context`); a widget /
739    // anonymous connection does not, so fall back to the session's persisted org
740    // (every session carries `organization_id` since the create-session path
741    // derives it). The operator's built-in single-tenant ACL ignores the org, so
742    // this is behavior-preserving for the reference flavor.
743    let access_owned = if access.organization_id.is_some() {
744        access.clone()
745    } else {
746        access
747            .clone()
748            .with_organization_id(session.organization_id.clone())
749    };
750    let sink_owned = sink.clone();
751    let request_id_owned = request_id.to_string();
752    let conversation_id = session.conversation_id.clone();
753
754    tokio::spawn(async move {
755        let result = runner::run_streaming_turn(
756            TurnRequest {
757                storage: state_for_turn.storage.clone(),
758                llm,
759                max_iterations: state_for_turn.config.max_iterations,
760                conversation_id: &conversation_id,
761                request_id: &request_id_owned,
762                user_message: &message,
763                // The connection's resolved document-level entitlement: retrieval is
764                // filtered to what this requester may read (org-public only when the
765                // connection is anonymous).
766                access: access_owned,
767                // Production: `None` (a live client is built from `llm`). Tests: the
768                // scenario corpus's `MockLlmClient`, which runs the turn offline.
769                llm_provider: chat_provider,
770                // Opt-in rerank stage (feature gap G8): `None` unless the operator
771                // enabled it via `SMOOTH_AGENT_RERANK` (gateway/lexical). Default-off
772                // keeps retrieval behavior unchanged.
773                reranker: crate::reranker::build_reranker(
774                    &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
775                ),
776                confirmation,
777                // SEAM 1 — host tool provider (None by default ⇒ built-ins only).
778                tool_provider,
779                // SEAM 2 — resolved per-org persona (None ⇒ const prompt).
780                system_prompt,
781                org_id: Some(org_id),
782                // The per-org key resolved above, threaded so a host tool
783                // provider's retrieval tools call the same gateway this turn used.
784                gateway_key: turn_gateway_key,
785                // SEAM 3 — per-agent conversation workflow + its cheap judge. Both
786                // `None` for a freeform agent, so the turn is unchanged.
787                workflow,
788                judge,
789                // SEAM 3 — per-agent first-turn greeting + tool allow-list.
790                greeting_section,
791                enabled_tools,
792                // SEAM 3 — authLevel gate + per-tool config delivery.
793                auth_gate,
794                tool_configs,
795            },
796            &sink_owned,
797        )
798        .await;
799
800        match result {
801            Ok(turn) => {
802                // Persist the workflow step pointer the judge landed on, so the
803                // next turn resumes on the right step. No-op when the agent has no
804                // workflow (`next_step_id` is `None`).
805                if let Some(step) = turn.next_step_id.as_deref() {
806                    state_for_turn.set_session_current_step(&session_id_owned, Some(step));
807                }
808                // If the auth gate refused an `end_user` tool for lack of a
809                // verified session this turn, and a host OTP service is installed
810                // and the session has a contact to reach, offer the OTP flow
811                // (prompt → dispatch → ack). The reference server does not
812                // park/auto-resume; the client verifies via `verify_otp` and
813                // re-sends its message once the session is authenticated.
814                if let (Some(gate), Some(otp)) =
815                    (otp_gate.as_ref(), state_for_turn.otp_service.clone())
816                {
817                    if let Some(tool) = gate.otp_refused_tool() {
818                        let contact = state_for_turn.session_contact(&session_id_owned);
819                        if !contact.is_empty() {
820                            offer_otp(
821                                otp.as_ref(),
822                                &session_id_owned,
823                                &tool,
824                                &contact,
825                                &request_id_owned,
826                                &sink_owned,
827                            )
828                            .await;
829                        }
830                    }
831                }
832                let response = runner::general_agent_response(&turn.reply);
833                let _ = sink_owned.send(protocol::eventual_response(
834                    &request_id_owned,
835                    200,
836                    &turn.message_id,
837                    response,
838                    false,
839                    &turn.citations,
840                    turn.usage,
841                ));
842            }
843            Err(e) => {
844                let _ = sink_owned.send(protocol::error(
845                    Some(&request_id_owned),
846                    "AGENT_ERROR",
847                    &format!("agent turn failed: {e}"),
848                ));
849            }
850        }
851    });
852}
853
854/// `confirm_tool_action` — resume a turn parked on a write-tool confirmation.
855///
856/// Per `spec/actions/confirm-tool-action.schema.json` the client sends
857/// `{ action, sessionId, requestId, approved }` in reply to a
858/// `write_confirmation_required` event. We look up the session's registered
859/// [`HumanResponse`](smooth_operator_core::HumanResponse) sender (set by the
860/// runner's confirmation bridge when the turn parked), take it, and feed it the
861/// verdict: `approved` → `Approved` (the parked tool executes), else `Denied`
862/// (the tool is skipped with a rejection result the model sees). There is no
863/// dedicated response event — the resumed workflow signals continuation via its
864/// normal streaming sequence (`stream_chunk`/`stream_token` → `eventual_response`);
865/// we additionally ack with an `immediate_response`. Taking the sender makes a
866/// duplicate confirm a no-op (`NO_PENDING_CONFIRMATION`).
867fn handle_confirm_tool_action(
868    state: &AppState,
869    parsed: &Value,
870    request_id: Option<&str>,
871    sink: &UnboundedSender<Value>,
872) {
873    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
874        let _ = sink.send(protocol::error(
875            request_id,
876            "VALIDATION_ERROR",
877            "confirm_tool_action requires a 'sessionId'",
878        ));
879        return;
880    };
881
882    // `approved` is required and must be a boolean — a missing/garbled verdict
883    // must NOT silently approve a write. Fail closed on a bad shape.
884    let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
885        let _ = sink.send(protocol::error(
886            request_id,
887            "VALIDATION_ERROR",
888            "confirm_tool_action requires a boolean 'approved'",
889        ));
890        return;
891    };
892
893    let Some(responder) = state.take_confirmation(session_id) else {
894        let _ = sink.send(protocol::error(
895            request_id,
896            "NO_PENDING_CONFIRMATION",
897            &format!("no tool action is awaiting confirmation for session '{session_id}'"),
898        ));
899        return;
900    };
901
902    let verdict = if approved {
903        smooth_operator_core::HumanResponse::Approved
904    } else {
905        smooth_operator_core::HumanResponse::Denied {
906            reason: "user rejected the action".to_string(),
907        }
908    };
909
910    if responder.send(verdict).is_err() {
911        // The parked turn ended (timeout / disconnect) before the confirm landed.
912        let _ = sink.send(protocol::error(
913            request_id,
914            "NO_PENDING_CONFIRMATION",
915            &format!(
916                "the turn awaiting confirmation for session '{session_id}' is no longer active"
917            ),
918        ));
919        return;
920    }
921
922    // Ack the confirmation; the resumed turn streams its own follow-on events.
923    let _ = sink.send(protocol::immediate_response(
924        request_id,
925        200,
926        if approved {
927            "Tool action approved"
928        } else {
929            "Tool action rejected"
930        },
931        json!({ "sessionId": session_id, "approved": approved }),
932    ));
933}
934
935/// Apply an optional per-turn `model` override (from a `send_message` body) to a
936/// resolved [`LlmConfig`]. When the body carries a non-empty `model` string, this
937/// turn runs on that gateway model id (a Smooth Modes / `/smooth-mode` preset),
938/// overriding the server's configured default; an absent, non-string, or
939/// blank/whitespace-only `model` leaves the config's default model unchanged
940/// (byte-for-byte the prior behavior). Every other field (url, key, limits)
941/// stays as resolved — only the model id changes.
942fn apply_model_override(mut llm: LlmConfig, body: &Value) -> LlmConfig {
943    if let Some(model) = body.get("model").and_then(Value::as_str) {
944        let model = model.trim();
945        if !model.is_empty() {
946            llm.model = model.to_string();
947        }
948    }
949    llm
950}
951
952/// Cap the judge's output: a `yes` / `no` / `maybe` verdict needs only a few
953/// tokens. Small so the extra per-turn cost + latency stay negligible.
954const JUDGE_MAX_TOKENS: u32 = 16;
955
956/// Build the per-agent authLevel gate, or `None` when it would be inert.
957///
958/// The set of tools that "support auth requirements" (the operator analog of the
959/// TS `supportsAuthRequirement` flag) comes from `SMOOTH_AGENT_AUTH_TOOLS`
960/// (comma-separated); empty (the default) ⇒ nothing is gated.
961/// `session_authenticated` is the session's OTP-verified bit (from a prior
962/// successful `verify_otp`): `false` fail-closed-refuses `end_user` tools (and,
963/// with an OtpService installed, triggers the OTP-offer flow); `true` lets a
964/// verified caller's `end_user` tools run.
965fn build_auth_gate(
966    state: &AppState,
967    cfg: &AgentBehaviorConfig,
968    session_authenticated: bool,
969) -> Option<AuthGateHook> {
970    let supporting: std::collections::HashSet<String> = std::env::var("SMOOTH_AGENT_AUTH_TOOLS")
971        .ok()
972        .into_iter()
973        .flat_map(|s| {
974            s.split(',')
975                .map(str::trim)
976                .filter(|s| !s.is_empty())
977                .map(str::to_string)
978                .collect::<Vec<_>>()
979        })
980        .collect();
981    if supporting.is_empty() {
982        let _ = state; // no host-declared auth-supporting tools → gate is inert
983        return None;
984    }
985    let levels = cfg
986        .enabled_tools
987        .iter()
988        .map(|t| (t.tool_id.clone(), AuthLevel::parse(&t.auth_level)))
989        .collect();
990    let hook = AuthGateHook::new(levels, cfg.visibility, session_authenticated, supporting);
991    hook.is_active().then_some(hook)
992}
993
994/// Emit the OTP-offer sequence for a turn whose `end_user` tool was refused for
995/// lack of a verified session: `otp_verification_required` (prompt the client),
996/// then `send_otp` on the host service, then `otp_sent` (ack delivery) — or an
997/// `error` event if delivery fails. The masked destination + channel come from
998/// the host; the server never sees the code. `auth_level` is fixed `end_user`
999/// (the only level this flow remedies).
1000async fn offer_otp(
1001    otp: &dyn smooth_operator::otp::OtpService,
1002    session_id: &str,
1003    tool: &str,
1004    contact: &smooth_operator::otp::OtpContact,
1005    request_id: &str,
1006    sink: &UnboundedSender<Value>,
1007) {
1008    let channels: Vec<&str> = contact
1009        .available_channels()
1010        .iter()
1011        .map(|c| c.as_str())
1012        .collect();
1013    let _ = sink.send(protocol::otp_verification_required(
1014        request_id,
1015        tool,
1016        &format!("Verify your identity to continue using '{tool}'."),
1017        &channels,
1018        "end_user",
1019    ));
1020    match otp.send_otp(session_id, contact).await {
1021        Ok(delivery) => {
1022            let _ = sink.send(protocol::otp_sent(
1023                request_id,
1024                delivery.channel.as_str(),
1025                &delivery.masked_destination,
1026            ));
1027        }
1028        Err(e) => {
1029            let _ = sink.send(protocol::error(
1030                Some(request_id),
1031                "OTP_SEND_FAILED",
1032                &format!("failed to send verification code: {e}"),
1033            ));
1034        }
1035    }
1036}
1037
1038/// `verify_otp` — validate a submitted OTP code and, on success, mark the
1039/// session identity-verified. Per `spec/actions/verify-otp.schema.json` the
1040/// client sends `{ action, sessionId, requestId, code }` in reply to an
1041/// `otp_verification_required` event. There is no dedicated response event: a
1042/// correct code emits `otp_verified` (the client then re-sends its message to
1043/// run the gated tool — the reference server does not park/auto-resume the
1044/// original turn), a rejected code emits `otp_invalid` carrying the host's
1045/// remaining-attempt count. With no [`OtpService`](smooth_operator::otp::OtpService)
1046/// installed, verification is impossible, so we fail closed with an `otp_invalid`
1047/// (`NOT_FOUND`, 0 attempts).
1048async fn handle_verify_otp(
1049    state: &AppState,
1050    parsed: &Value,
1051    request_id: Option<&str>,
1052    sink: &UnboundedSender<Value>,
1053) {
1054    // requestId is load-bearing (it echoes the originating
1055    // otp_verification_required); require it.
1056    let Some(request_id) = request_id else {
1057        let _ = sink.send(protocol::error(
1058            None,
1059            "VALIDATION_ERROR",
1060            "verify_otp requires a 'requestId'",
1061        ));
1062        return;
1063    };
1064
1065    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
1066        let _ = sink.send(protocol::error(
1067            Some(request_id),
1068            "VALIDATION_ERROR",
1069            "verify_otp requires a 'sessionId'",
1070        ));
1071        return;
1072    };
1073
1074    let Some(code) = parsed.get("code").and_then(Value::as_str) else {
1075        let _ = sink.send(protocol::error(
1076            Some(request_id),
1077            "VALIDATION_ERROR",
1078            "verify_otp requires a 'code'",
1079        ));
1080        return;
1081    };
1082
1083    // The session must exist (a code can't verify a session we don't track).
1084    if state.get_session(session_id).is_none() {
1085        let _ = sink.send(protocol::error(
1086            Some(request_id),
1087            "SESSION_NOT_FOUND",
1088            &format!("session '{session_id}' not found"),
1089        ));
1090        return;
1091    }
1092
1093    // No host OTP service → verification is impossible. Fail closed on the
1094    // documented otp_invalid path (a client shouldn't reach here without first
1095    // receiving otp_verification_required, which only an installed service emits).
1096    let Some(otp) = state.otp_service.clone() else {
1097        let _ = sink.send(protocol::otp_invalid(
1098            request_id,
1099            Some("NOT_FOUND"),
1100            0,
1101            "No verification is in progress for this session.",
1102        ));
1103        return;
1104    };
1105
1106    match otp.verify_otp(session_id, code).await {
1107        smooth_operator::otp::OtpVerifyOutcome::Verified => {
1108            state.set_session_authenticated(session_id, true);
1109            let _ = sink.send(protocol::otp_verified(
1110                request_id,
1111                "Identity verified successfully.",
1112            ));
1113        }
1114        smooth_operator::otp::OtpVerifyOutcome::Invalid {
1115            attempts_remaining,
1116            error,
1117            message,
1118        } => {
1119            let _ = sink.send(protocol::otp_invalid(
1120                request_id,
1121                error.map(smooth_operator::otp::OtpError::as_str),
1122                attempts_remaining,
1123                &message,
1124            ));
1125        }
1126    }
1127}
1128
1129/// Build the workflow judge's LLM surface. Prefers a test-injected chat provider
1130/// (the scenario mock — runs offline); otherwise builds a live client on the
1131/// server's **default** (cheap) model with the turn's resolved gateway url/key,
1132/// independent of any per-turn model override, so the judge stays cheap even when
1133/// the turn itself runs on a bigger model.
1134fn build_judge_provider(state: &AppState, turn_llm: &LlmConfig) -> Arc<dyn LlmProvider> {
1135    if let Some(mock) = state.chat_provider.clone() {
1136        return mock;
1137    }
1138    let mut cfg = turn_llm.clone();
1139    cfg.model = state.config.judge_model.clone();
1140    cfg.max_tokens = JUDGE_MAX_TOKENS;
1141    Arc::new(LlmClient::new(cfg))
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use super::*;
1147    use smooth_operator_core::llm::{ApiFormat, RetryPolicy};
1148
1149    /// A baseline config whose `model` is the server default, so each override
1150    /// test asserts against a known starting model.
1151    fn base_llm() -> LlmConfig {
1152        LlmConfig {
1153            api_url: "https://llm.smoo.ai/v1".to_string(),
1154            api_key: "sk-test".to_string(),
1155            model: "claude-haiku-4-5".to_string(),
1156            max_tokens: 512,
1157            temperature: 0.0,
1158            retry_policy: RetryPolicy::default(),
1159            api_format: ApiFormat::OpenAiCompat,
1160        }
1161    }
1162
1163    #[test]
1164    fn model_override_present_replaces_model() {
1165        let body = json!({ "action": "send_message", "model": "claude-opus-4-8" });
1166        let llm = apply_model_override(base_llm(), &body);
1167        assert_eq!(llm.model, "claude-opus-4-8");
1168        // Only the model id changes — every other field is preserved.
1169        assert_eq!(llm.api_url, "https://llm.smoo.ai/v1");
1170        assert_eq!(llm.api_key, "sk-test");
1171        assert_eq!(llm.max_tokens, 512);
1172    }
1173
1174    #[test]
1175    fn model_override_absent_keeps_default() {
1176        let body = json!({ "action": "send_message", "message": "hi" });
1177        let llm = apply_model_override(base_llm(), &body);
1178        assert_eq!(llm.model, "claude-haiku-4-5");
1179    }
1180
1181    #[test]
1182    fn model_override_blank_or_non_string_keeps_default() {
1183        // Whitespace-only is treated as absent.
1184        let blank = json!({ "model": "   " });
1185        assert_eq!(
1186            apply_model_override(base_llm(), &blank).model,
1187            "claude-haiku-4-5"
1188        );
1189        // A non-string `model` is ignored (no panic, default kept).
1190        let wrong_type = json!({ "model": 42 });
1191        assert_eq!(
1192            apply_model_override(base_llm(), &wrong_type).model,
1193            "claude-haiku-4-5"
1194        );
1195    }
1196}