Skip to main content

smooth_operator_server/
handler.rs

1//! Action dispatch — parses a client action envelope and produces the matching
2//! server events.
3//!
4//! Each handler is `async` and forwards events through an
5//! `UnboundedSender<serde_json::Value>` (the per-connection outbound sink). The
6//! socket task drains the sink and writes each value as a JSON WS text frame, so
7//! streaming actions (`send_message`) can emit many events while still being
8//! driven from one place.
9
10use serde_json::{json, Value};
11use tokio::sync::mpsc::UnboundedSender;
12
13use smooth_operator::access_control::AccessContext;
14use smooth_operator::domain::{
15    Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
16};
17
18use crate::protocol;
19use crate::runner;
20use crate::runner::TurnRequest;
21use crate::state::AppState;
22
23/// The agent's display name for the reference server.
24const AGENT_NAME: &str = "smooth-agent";
25
26/// Parse and dispatch a single inbound text frame. Any produced events are sent
27/// through `sink`. Returns `Ok(())` always — protocol-level failures are
28/// surfaced as `error` events, never as hard errors that drop the connection.
29pub async fn handle_frame(
30    state: &AppState,
31    access: &AccessContext,
32    conn_id: &str,
33    origin: Option<&str>,
34    raw: &str,
35    sink: &UnboundedSender<Value>,
36) {
37    let parsed: Value = match serde_json::from_str(raw) {
38        Ok(v) => v,
39        Err(e) => {
40            let _ = sink.send(protocol::error(
41                None,
42                "VALIDATION_ERROR",
43                &format!("invalid JSON frame: {e}"),
44            ));
45            return;
46        }
47    };
48
49    let action = parsed.get("action").and_then(Value::as_str);
50    let request_id = parsed.get("requestId").and_then(Value::as_str);
51
52    match action {
53        Some("ping") => {
54            let _ = sink.send(protocol::pong(request_id));
55        }
56        Some("create_conversation_session") => {
57            handle_create_session(state, conn_id, origin, &parsed, request_id, sink).await;
58        }
59        Some("get_session") => {
60            handle_get_session(state, &parsed, request_id, sink);
61        }
62        Some("send_message") => {
63            handle_send_message(state, access, &parsed, request_id, sink).await;
64        }
65        Some(other) => {
66            let _ = sink.send(protocol::error(
67                request_id,
68                "UNSUPPORTED_ACTION",
69                &format!("action '{other}' is not supported by this server"),
70            ));
71        }
72        None => {
73            let _ = sink.send(protocol::error(
74                request_id,
75                "VALIDATION_ERROR",
76                "missing 'action' field",
77            ));
78        }
79    }
80}
81
82/// Enforce an agent's embeddable-widget policy (origin allowlist + `authContext`)
83/// before a session is created. Returns `true` to proceed, or `false` after
84/// emitting a protocol `error` (the caller must then stop). Agents with no policy
85/// proceed — unless `WIDGET_AUTH_STRICT` is set, in which case an unknown agent is
86/// rejected (fail closed).
87async fn enforce_widget_auth(
88    state: &AppState,
89    origin: Option<&str>,
90    agent_id: &str,
91    parsed: &Value,
92    request_id: Option<&str>,
93    sink: &UnboundedSender<Value>,
94) -> bool {
95    let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
96        if state.config.widget_auth_strict {
97            let _ = sink.send(protocol::error(
98                request_id,
99                "AGENT_NOT_AUTHORIZED",
100                "this agent is not registered for embedding",
101            ));
102            return false;
103        }
104        return true;
105    };
106
107    // Origin allowlist — fail closed: a missing or disallowed `Origin` is rejected.
108    if !smooth_operator::widget_auth::origin_allowed(
109        &policy.allowed_origins,
110        origin.unwrap_or_default(),
111    ) {
112        let _ = sink.send(protocol::error(
113            request_id,
114            "ORIGIN_NOT_ALLOWED",
115            "this origin is not allowed to embed this agent",
116        ));
117        return false;
118    }
119
120    // Pre-auth `authContext` (optional): when present it must verify.
121    if let Some(ac) = parsed.get("authContext") {
122        if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
123            let _ = sink.send(protocol::error(
124                request_id,
125                "AUTH_CONTEXT_INVALID",
126                "authContext signature failed verification",
127            ));
128            return false;
129        }
130    }
131    true
132}
133
134/// Verify a JSON `authContext` (`{userId, signature, timestamp}`) against the
135/// agent's `public_key`. False on any missing field/key or signature/replay
136/// failure. Replay window: 60s.
137fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
138    let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
139        public_key,
140        ac.get("userId").and_then(Value::as_str),
141        ac.get("signature").and_then(Value::as_str),
142        ac.get("timestamp").and_then(Value::as_i64),
143    ) else {
144        return false;
145    };
146    let now = chrono::Utc::now().timestamp();
147    smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
148}
149
150/// `create_conversation_session` — create a conversation + user & agent
151/// participants + a session, then reply with an `immediate_response` carrying
152/// the session descriptor (per `create-conversation-session.schema.json`).
153async fn handle_create_session(
154    state: &AppState,
155    conn_id: &str,
156    origin: Option<&str>,
157    parsed: &Value,
158    request_id: Option<&str>,
159    sink: &UnboundedSender<Value>,
160) {
161    let agent_id = parsed
162        .get("agentId")
163        .and_then(Value::as_str)
164        .map(str::to_string)
165        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
166
167    // Embeddable-widget auth: enforce the agent's origin allowlist + authContext
168    // before creating any session. No-op for agents without a policy (unless
169    // WIDGET_AUTH_STRICT). On denial, an error is emitted and we stop here.
170    if !enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
171        return;
172    }
173
174    let user_name = parsed
175        .get("userName")
176        .and_then(Value::as_str)
177        .unwrap_or("Visitor")
178        .to_string();
179    let user_email = parsed
180        .get("userEmail")
181        .and_then(Value::as_str)
182        .map(str::to_string);
183    let browser_fingerprint = parsed
184        .get("browserFingerprint")
185        .and_then(Value::as_str)
186        .map(str::to_string);
187
188    let now = chrono::Utc::now();
189    // The reference server is single-org; conversations belong to the seed org so
190    // the admin API's org-scoping (document sets, indexing runs) lines up with
191    // the seeded knowledge. A multi-tenant deployment derives this from auth.
192    let org_id = crate::server::SEED_ORG_ID.to_string();
193
194    let conversation_id = uuid::Uuid::new_v4().to_string();
195    let session_id = uuid::Uuid::new_v4().to_string();
196    let user_participant_id = uuid::Uuid::new_v4().to_string();
197    let agent_participant_id = uuid::Uuid::new_v4().to_string();
198
199    // Associate this connection with its session (and agent) on the backplane so
200    // events published to the session/agent — by an agent turn or any other
201    // service — reach this client's socket, on this pod or (with a Redis/NATS
202    // backplane) any pod.
203    state
204        .backplane
205        .associate(
206            conn_id,
207            smooth_operator::backplane::Target::Session(session_id.clone()),
208        )
209        .await;
210    state
211        .backplane
212        .associate(
213            conn_id,
214            smooth_operator::backplane::Target::Agent(agent_id.clone()),
215        )
216        .await;
217
218    let conversation = Conversation {
219        id: conversation_id.clone(),
220        platform: Platform::Web,
221        name: format!("Session {session_id}"),
222        organization_id: org_id.clone(),
223        idempotency_key: session_id.clone(),
224        metadata_json: parsed.get("metadata").cloned(),
225        analytics_json: None,
226        created_at: now,
227        updated_at: now,
228    };
229
230    let user_participant = Participant {
231        id: user_participant_id.clone(),
232        conversation_id: conversation_id.clone(),
233        organization_id: org_id.clone(),
234        participant_type: ParticipantType::User,
235        external_id: None,
236        internal_id: None,
237        browser_fingerprint,
238        browser_info: None,
239        name: user_name,
240        email: user_email,
241        phone: None,
242        crm_contact_id: None,
243        metadata_json: None,
244        created_at: now,
245        updated_at: now,
246    };
247
248    let agent_participant = Participant {
249        id: agent_participant_id.clone(),
250        conversation_id: conversation_id.clone(),
251        organization_id: org_id.clone(),
252        participant_type: ParticipantType::AiAgent,
253        external_id: None,
254        internal_id: Some(agent_id.clone()),
255        browser_fingerprint: None,
256        browser_info: None,
257        name: AGENT_NAME.to_string(),
258        email: None,
259        phone: None,
260        crm_contact_id: None,
261        metadata_json: None,
262        created_at: now,
263        updated_at: now,
264    };
265
266    let session = Session {
267        session_id: session_id.clone(),
268        conversation_id: conversation_id.clone(),
269        agent_id: agent_id.clone(),
270        agent_name: AGENT_NAME.to_string(),
271        user_participant_id: user_participant_id.clone(),
272        agent_participant_id: agent_participant_id.clone(),
273        // The thread id is the conversation id: per-session memory is carried by
274        // replaying this conversation's persisted message log (see runner.rs).
275        thread_id: conversation_id.clone(),
276        status: Some(SessionStatus::Active),
277        token_count: Some(0),
278        message_count: Some(0),
279        metadata: None,
280        created_at: Some(now),
281        updated_at: Some(now),
282        ended_at: None,
283        last_activity_at: Some(now),
284    };
285
286    // Persist to the storage adapter (best-effort: a failure surfaces as error).
287    let storage = state.storage.clone();
288    let sink_clone = sink.clone();
289    let request_id_owned = request_id.map(str::to_string);
290    let session_for_registry = session.clone();
291    let state_clone = state.clone();
292
293    let data = json!({
294        "sessionId": session_id,
295        "conversationId": conversation_id,
296        "agentId": agent_id,
297        "agentName": AGENT_NAME,
298        "userParticipantId": user_participant_id,
299        "agentParticipantId": agent_participant_id,
300    });
301
302    tokio::spawn(async move {
303        let rid = request_id_owned.as_deref();
304        if let Err(e) = storage.create_conversation(conversation).await {
305            let _ = sink_clone.send(protocol::error(
306                rid,
307                "INTERNAL_ERROR",
308                &format!("create conversation failed: {e}"),
309            ));
310            return;
311        }
312        if let Err(e) = storage.add_participant(user_participant).await {
313            let _ = sink_clone.send(protocol::error(
314                rid,
315                "INTERNAL_ERROR",
316                &format!("add user participant failed: {e}"),
317            ));
318            return;
319        }
320        if let Err(e) = storage.add_participant(agent_participant).await {
321            let _ = sink_clone.send(protocol::error(
322                rid,
323                "INTERNAL_ERROR",
324                &format!("add agent participant failed: {e}"),
325            ));
326            return;
327        }
328        if let Err(e) = storage.create_session(session).await {
329            let _ = sink_clone.send(protocol::error(
330                rid,
331                "INTERNAL_ERROR",
332                &format!("create session failed: {e}"),
333            ));
334            return;
335        }
336        state_clone.insert_session(session_for_registry);
337        let _ = sink_clone.send(protocol::immediate_response(
338            rid,
339            200,
340            "Session created",
341            data,
342        ));
343    });
344}
345
346/// `get_session` — return the session snapshot (per `get-session.schema.json`).
347fn handle_get_session(
348    state: &AppState,
349    parsed: &Value,
350    request_id: Option<&str>,
351    sink: &UnboundedSender<Value>,
352) {
353    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
354        let _ = sink.send(protocol::error(
355            request_id,
356            "VALIDATION_ERROR",
357            "missing 'sessionId'",
358        ));
359        return;
360    };
361
362    match state.get_session(session_id) {
363        Some(s) => {
364            let data = json!({
365                "sessionId": s.session_id,
366                "conversationId": s.conversation_id,
367                "agentId": s.agent_id,
368                "agentName": s.agent_name,
369                "userParticipantId": s.user_participant_id,
370                "agentParticipantId": s.agent_participant_id,
371                "threadId": s.thread_id,
372                "status": s.status.map_or("active", |st| match st {
373                    SessionStatus::Active => "active",
374                    SessionStatus::Idle => "idle",
375                    SessionStatus::Ended => "ended",
376                }),
377            });
378            let _ = sink.send(protocol::immediate_response(
379                request_id, 200, "Session", data,
380            ));
381        }
382        None => {
383            let _ = sink.send(protocol::error(
384                request_id,
385                "SESSION_NOT_FOUND",
386                &format!("session '{session_id}' not found"),
387            ));
388        }
389    }
390}
391
392/// `send_message` — ack with `immediate_response` (202), run a streaming
393/// knowledge-grounded turn, emit `stream_token` / `stream_chunk` as it goes, and
394/// finish with `eventual_response` (200). Errors (no gateway key, unknown
395/// session, agent failure) surface as clean `error` events.
396async fn handle_send_message(
397    state: &AppState,
398    access: &AccessContext,
399    parsed: &Value,
400    request_id: Option<&str>,
401    sink: &UnboundedSender<Value>,
402) {
403    // requestId is load-bearing for streaming correlation; require it.
404    let Some(request_id) = request_id else {
405        let _ = sink.send(protocol::error(
406            None,
407            "VALIDATION_ERROR",
408            "send_message requires a 'requestId'",
409        ));
410        return;
411    };
412
413    let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
414        let _ = sink.send(protocol::error(
415            Some(request_id),
416            "VALIDATION_ERROR",
417            "missing 'sessionId'",
418        ));
419        return;
420    };
421
422    let message = match parsed.get("message").and_then(Value::as_str) {
423        Some(m) if !m.trim().is_empty() => m.to_string(),
424        _ => {
425            let _ = sink.send(protocol::error(
426                Some(request_id),
427                "VALIDATION_ERROR",
428                "missing or empty 'message'",
429            ));
430            return;
431        }
432    };
433
434    let Some(session) = state.get_session(session_id) else {
435        let _ = sink.send(protocol::error(
436            Some(request_id),
437            "SESSION_NOT_FOUND",
438            &format!("session '{session_id}' not found"),
439        ));
440        return;
441    };
442
443    // No gateway key → can't run an LLM turn. Return a clean error (the server
444    // stays usable for protocol-only checks).
445    let Some(llm) = state.config.llm_config() else {
446        let _ = sink.send(protocol::error(
447            Some(request_id),
448            "LLM_UNAVAILABLE",
449            "SMOOAI_GATEWAY_KEY is not configured; this server cannot serve LLM turns. \
450             Set the gateway key to enable send_message.",
451        ));
452        return;
453    };
454
455    // Ack: processing started.
456    let _ = sink.send(protocol::immediate_response(
457        Some(request_id),
458        202,
459        "Processing your request...",
460        json!({}),
461    ));
462
463    let result = runner::run_streaming_turn(
464        TurnRequest {
465            storage: state.storage.clone(),
466            llm,
467            max_iterations: state.config.max_iterations,
468            conversation_id: &session.conversation_id,
469            request_id,
470            user_message: &message,
471            // The connection's resolved document-level entitlement: retrieval is
472            // filtered to what this requester may read (org-public only when the
473            // connection is anonymous).
474            access: access.clone(),
475            llm_provider: None,
476            // Opt-in rerank stage (feature gap G8): `None` unless the operator
477            // enabled it via `SMOOTH_AGENT_RERANK` (gateway/lexical). Default-off
478            // keeps retrieval behavior unchanged.
479            reranker: crate::reranker::build_reranker(
480                &crate::reranker::RerankerConfig::from_server_config(&state.config),
481            ),
482        },
483        sink,
484    )
485    .await;
486
487    match result {
488        Ok(turn) => {
489            let response = runner::general_agent_response(&turn.reply);
490            let _ = sink.send(protocol::eventual_response(
491                request_id,
492                200,
493                &turn.message_id,
494                response,
495                false,
496                &turn.citations,
497            ));
498        }
499        Err(e) => {
500            let _ = sink.send(protocol::error(
501                Some(request_id),
502                "AGENT_ERROR",
503                &format!("agent turn failed: {e}"),
504            ));
505        }
506    }
507}