1use serde_json::{json, Value};
11use tokio::sync::mpsc::UnboundedSender;
12
13use smooth_operator::access_control::AccessContext;
14use smooth_operator::domain::{
15 Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
16};
17
18use crate::protocol;
19use crate::runner;
20use crate::runner::TurnRequest;
21use crate::state::AppState;
22
23const AGENT_NAME: &str = "smooth-agent";
25
26pub async fn handle_frame(
30 state: &AppState,
31 access: &AccessContext,
32 conn_id: &str,
33 origin: Option<&str>,
34 raw: &str,
35 sink: &UnboundedSender<Value>,
36) {
37 let parsed: Value = match serde_json::from_str(raw) {
38 Ok(v) => v,
39 Err(e) => {
40 let _ = sink.send(protocol::error(
41 None,
42 "VALIDATION_ERROR",
43 &format!("invalid JSON frame: {e}"),
44 ));
45 return;
46 }
47 };
48
49 let action = parsed.get("action").and_then(Value::as_str);
50 let request_id = parsed.get("requestId").and_then(Value::as_str);
51
52 match action {
53 Some("ping") => {
54 let _ = sink.send(protocol::pong(request_id));
55 }
56 Some("create_conversation_session") => {
57 handle_create_session(state, conn_id, origin, &parsed, request_id, sink).await;
58 }
59 Some("get_session") => {
60 handle_get_session(state, &parsed, request_id, sink);
61 }
62 Some("send_message") => {
63 handle_send_message(state, access, &parsed, request_id, sink).await;
64 }
65 Some(other) => {
66 let _ = sink.send(protocol::error(
67 request_id,
68 "UNSUPPORTED_ACTION",
69 &format!("action '{other}' is not supported by this server"),
70 ));
71 }
72 None => {
73 let _ = sink.send(protocol::error(
74 request_id,
75 "VALIDATION_ERROR",
76 "missing 'action' field",
77 ));
78 }
79 }
80}
81
82async fn enforce_widget_auth(
88 state: &AppState,
89 origin: Option<&str>,
90 agent_id: &str,
91 parsed: &Value,
92 request_id: Option<&str>,
93 sink: &UnboundedSender<Value>,
94) -> bool {
95 let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
96 if state.config.widget_auth_strict {
97 let _ = sink.send(protocol::error(
98 request_id,
99 "AGENT_NOT_AUTHORIZED",
100 "this agent is not registered for embedding",
101 ));
102 return false;
103 }
104 return true;
105 };
106
107 if !smooth_operator::widget_auth::origin_allowed(
109 &policy.allowed_origins,
110 origin.unwrap_or_default(),
111 ) {
112 let _ = sink.send(protocol::error(
113 request_id,
114 "ORIGIN_NOT_ALLOWED",
115 "this origin is not allowed to embed this agent",
116 ));
117 return false;
118 }
119
120 if let Some(ac) = parsed.get("authContext") {
122 if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
123 let _ = sink.send(protocol::error(
124 request_id,
125 "AUTH_CONTEXT_INVALID",
126 "authContext signature failed verification",
127 ));
128 return false;
129 }
130 }
131 true
132}
133
134fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
138 let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
139 public_key,
140 ac.get("userId").and_then(Value::as_str),
141 ac.get("signature").and_then(Value::as_str),
142 ac.get("timestamp").and_then(Value::as_i64),
143 ) else {
144 return false;
145 };
146 let now = chrono::Utc::now().timestamp();
147 smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
148}
149
150async fn handle_create_session(
154 state: &AppState,
155 conn_id: &str,
156 origin: Option<&str>,
157 parsed: &Value,
158 request_id: Option<&str>,
159 sink: &UnboundedSender<Value>,
160) {
161 let agent_id = parsed
162 .get("agentId")
163 .and_then(Value::as_str)
164 .map(str::to_string)
165 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
166
167 if !enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
171 return;
172 }
173
174 let user_name = parsed
175 .get("userName")
176 .and_then(Value::as_str)
177 .unwrap_or("Visitor")
178 .to_string();
179 let user_email = parsed
180 .get("userEmail")
181 .and_then(Value::as_str)
182 .map(str::to_string);
183 let browser_fingerprint = parsed
184 .get("browserFingerprint")
185 .and_then(Value::as_str)
186 .map(str::to_string);
187
188 let now = chrono::Utc::now();
189 let org_id = crate::server::SEED_ORG_ID.to_string();
193
194 let conversation_id = uuid::Uuid::new_v4().to_string();
195 let session_id = uuid::Uuid::new_v4().to_string();
196 let user_participant_id = uuid::Uuid::new_v4().to_string();
197 let agent_participant_id = uuid::Uuid::new_v4().to_string();
198
199 state
204 .backplane
205 .associate(
206 conn_id,
207 smooth_operator::backplane::Target::Session(session_id.clone()),
208 )
209 .await;
210 state
211 .backplane
212 .associate(
213 conn_id,
214 smooth_operator::backplane::Target::Agent(agent_id.clone()),
215 )
216 .await;
217
218 let conversation = Conversation {
219 id: conversation_id.clone(),
220 platform: Platform::Web,
221 name: format!("Session {session_id}"),
222 organization_id: org_id.clone(),
223 idempotency_key: session_id.clone(),
224 metadata_json: parsed.get("metadata").cloned(),
225 analytics_json: None,
226 created_at: now,
227 updated_at: now,
228 };
229
230 let user_participant = Participant {
231 id: user_participant_id.clone(),
232 conversation_id: conversation_id.clone(),
233 organization_id: org_id.clone(),
234 participant_type: ParticipantType::User,
235 external_id: None,
236 internal_id: None,
237 browser_fingerprint,
238 browser_info: None,
239 name: user_name,
240 email: user_email,
241 phone: None,
242 crm_contact_id: None,
243 metadata_json: None,
244 created_at: now,
245 updated_at: now,
246 };
247
248 let agent_participant = Participant {
249 id: agent_participant_id.clone(),
250 conversation_id: conversation_id.clone(),
251 organization_id: org_id.clone(),
252 participant_type: ParticipantType::AiAgent,
253 external_id: None,
254 internal_id: Some(agent_id.clone()),
255 browser_fingerprint: None,
256 browser_info: None,
257 name: AGENT_NAME.to_string(),
258 email: None,
259 phone: None,
260 crm_contact_id: None,
261 metadata_json: None,
262 created_at: now,
263 updated_at: now,
264 };
265
266 let session = Session {
267 session_id: session_id.clone(),
268 conversation_id: conversation_id.clone(),
269 agent_id: agent_id.clone(),
270 agent_name: AGENT_NAME.to_string(),
271 user_participant_id: user_participant_id.clone(),
272 agent_participant_id: agent_participant_id.clone(),
273 thread_id: conversation_id.clone(),
276 status: Some(SessionStatus::Active),
277 token_count: Some(0),
278 message_count: Some(0),
279 metadata: None,
280 created_at: Some(now),
281 updated_at: Some(now),
282 ended_at: None,
283 last_activity_at: Some(now),
284 };
285
286 let storage = state.storage.clone();
288 let sink_clone = sink.clone();
289 let request_id_owned = request_id.map(str::to_string);
290 let session_for_registry = session.clone();
291 let state_clone = state.clone();
292
293 let data = json!({
294 "sessionId": session_id,
295 "conversationId": conversation_id,
296 "agentId": agent_id,
297 "agentName": AGENT_NAME,
298 "userParticipantId": user_participant_id,
299 "agentParticipantId": agent_participant_id,
300 });
301
302 tokio::spawn(async move {
303 let rid = request_id_owned.as_deref();
304 if let Err(e) = storage.create_conversation(conversation).await {
305 let _ = sink_clone.send(protocol::error(
306 rid,
307 "INTERNAL_ERROR",
308 &format!("create conversation failed: {e}"),
309 ));
310 return;
311 }
312 if let Err(e) = storage.add_participant(user_participant).await {
313 let _ = sink_clone.send(protocol::error(
314 rid,
315 "INTERNAL_ERROR",
316 &format!("add user participant failed: {e}"),
317 ));
318 return;
319 }
320 if let Err(e) = storage.add_participant(agent_participant).await {
321 let _ = sink_clone.send(protocol::error(
322 rid,
323 "INTERNAL_ERROR",
324 &format!("add agent participant failed: {e}"),
325 ));
326 return;
327 }
328 if let Err(e) = storage.create_session(session).await {
329 let _ = sink_clone.send(protocol::error(
330 rid,
331 "INTERNAL_ERROR",
332 &format!("create session failed: {e}"),
333 ));
334 return;
335 }
336 state_clone.insert_session(session_for_registry);
337 let _ = sink_clone.send(protocol::immediate_response(
338 rid,
339 200,
340 "Session created",
341 data,
342 ));
343 });
344}
345
346fn handle_get_session(
348 state: &AppState,
349 parsed: &Value,
350 request_id: Option<&str>,
351 sink: &UnboundedSender<Value>,
352) {
353 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
354 let _ = sink.send(protocol::error(
355 request_id,
356 "VALIDATION_ERROR",
357 "missing 'sessionId'",
358 ));
359 return;
360 };
361
362 match state.get_session(session_id) {
363 Some(s) => {
364 let data = json!({
365 "sessionId": s.session_id,
366 "conversationId": s.conversation_id,
367 "agentId": s.agent_id,
368 "agentName": s.agent_name,
369 "userParticipantId": s.user_participant_id,
370 "agentParticipantId": s.agent_participant_id,
371 "threadId": s.thread_id,
372 "status": s.status.map_or("active", |st| match st {
373 SessionStatus::Active => "active",
374 SessionStatus::Idle => "idle",
375 SessionStatus::Ended => "ended",
376 }),
377 });
378 let _ = sink.send(protocol::immediate_response(
379 request_id, 200, "Session", data,
380 ));
381 }
382 None => {
383 let _ = sink.send(protocol::error(
384 request_id,
385 "SESSION_NOT_FOUND",
386 &format!("session '{session_id}' not found"),
387 ));
388 }
389 }
390}
391
392async fn handle_send_message(
397 state: &AppState,
398 access: &AccessContext,
399 parsed: &Value,
400 request_id: Option<&str>,
401 sink: &UnboundedSender<Value>,
402) {
403 let Some(request_id) = request_id else {
405 let _ = sink.send(protocol::error(
406 None,
407 "VALIDATION_ERROR",
408 "send_message requires a 'requestId'",
409 ));
410 return;
411 };
412
413 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
414 let _ = sink.send(protocol::error(
415 Some(request_id),
416 "VALIDATION_ERROR",
417 "missing 'sessionId'",
418 ));
419 return;
420 };
421
422 let message = match parsed.get("message").and_then(Value::as_str) {
423 Some(m) if !m.trim().is_empty() => m.to_string(),
424 _ => {
425 let _ = sink.send(protocol::error(
426 Some(request_id),
427 "VALIDATION_ERROR",
428 "missing or empty 'message'",
429 ));
430 return;
431 }
432 };
433
434 let Some(session) = state.get_session(session_id) else {
435 let _ = sink.send(protocol::error(
436 Some(request_id),
437 "SESSION_NOT_FOUND",
438 &format!("session '{session_id}' not found"),
439 ));
440 return;
441 };
442
443 let Some(llm) = state.config.llm_config() else {
446 let _ = sink.send(protocol::error(
447 Some(request_id),
448 "LLM_UNAVAILABLE",
449 "SMOOAI_GATEWAY_KEY is not configured; this server cannot serve LLM turns. \
450 Set the gateway key to enable send_message.",
451 ));
452 return;
453 };
454
455 let _ = sink.send(protocol::immediate_response(
457 Some(request_id),
458 202,
459 "Processing your request...",
460 json!({}),
461 ));
462
463 let result = runner::run_streaming_turn(
464 TurnRequest {
465 storage: state.storage.clone(),
466 llm,
467 max_iterations: state.config.max_iterations,
468 conversation_id: &session.conversation_id,
469 request_id,
470 user_message: &message,
471 access: access.clone(),
475 llm_provider: None,
476 reranker: crate::reranker::build_reranker(
480 &crate::reranker::RerankerConfig::from_server_config(&state.config),
481 ),
482 },
483 sink,
484 )
485 .await;
486
487 match result {
488 Ok(turn) => {
489 let response = runner::general_agent_response(&turn.reply);
490 let _ = sink.send(protocol::eventual_response(
491 request_id,
492 200,
493 &turn.message_id,
494 response,
495 false,
496 &turn.citations,
497 ));
498 }
499 Err(e) => {
500 let _ = sink.send(protocol::error(
501 Some(request_id),
502 "AGENT_ERROR",
503 &format!("agent turn failed: {e}"),
504 ));
505 }
506 }
507}