1use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::domain::{
17 Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
18};
19
20use crate::protocol;
21use crate::runner;
22use crate::runner::TurnRequest;
23use crate::state::AppState;
24
25const AGENT_NAME: &str = "smooth-agent";
27
28pub async fn handle_frame(
32 state: &AppState,
33 access: &AccessContext,
34 conn_id: &str,
35 origin: Option<&str>,
36 raw: &str,
37 sink: &UnboundedSender<Value>,
38) {
39 let parsed: Value = match serde_json::from_str(raw) {
40 Ok(v) => v,
41 Err(e) => {
42 let _ = sink.send(protocol::error(
43 None,
44 "VALIDATION_ERROR",
45 &format!("invalid JSON frame: {e}"),
46 ));
47 return;
48 }
49 };
50
51 let action = parsed.get("action").and_then(Value::as_str);
52 let request_id = parsed.get("requestId").and_then(Value::as_str);
53
54 match action {
55 Some("ping") => {
56 let _ = sink.send(protocol::pong(request_id));
57 }
58 Some("create_conversation_session") => {
59 handle_create_session(state, conn_id, origin, &parsed, request_id, sink).await;
60 }
61 Some("get_session") => {
62 handle_get_session(state, &parsed, request_id, sink);
63 }
64 Some("get_conversation_messages") => {
65 handle_get_conversation_messages(state, &parsed, request_id, sink).await;
66 }
67 Some("send_message") => {
68 handle_send_message(state, access, &parsed, request_id, sink).await;
69 }
70 Some("confirm_tool_action") => {
71 handle_confirm_tool_action(state, &parsed, request_id, sink);
72 }
73 Some(other) => {
74 let _ = sink.send(protocol::error(
75 request_id,
76 "UNSUPPORTED_ACTION",
77 &format!("action '{other}' is not supported by this server"),
78 ));
79 }
80 None => {
81 let _ = sink.send(protocol::error(
82 request_id,
83 "VALIDATION_ERROR",
84 "missing 'action' field",
85 ));
86 }
87 }
88}
89
90async fn enforce_widget_auth(
96 state: &AppState,
97 origin: Option<&str>,
98 agent_id: &str,
99 parsed: &Value,
100 request_id: Option<&str>,
101 sink: &UnboundedSender<Value>,
102) -> bool {
103 let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
104 if state.config.widget_auth_strict {
105 let _ = sink.send(protocol::error(
106 request_id,
107 "AGENT_NOT_AUTHORIZED",
108 "this agent is not registered for embedding",
109 ));
110 return false;
111 }
112 return true;
113 };
114
115 if !smooth_operator::widget_auth::origin_allowed(
117 &policy.allowed_origins,
118 origin.unwrap_or_default(),
119 ) {
120 let _ = sink.send(protocol::error(
121 request_id,
122 "ORIGIN_NOT_ALLOWED",
123 "this origin is not allowed to embed this agent",
124 ));
125 return false;
126 }
127
128 if let Some(ac) = parsed.get("authContext") {
130 if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
131 let _ = sink.send(protocol::error(
132 request_id,
133 "AUTH_CONTEXT_INVALID",
134 "authContext signature failed verification",
135 ));
136 return false;
137 }
138 }
139 true
140}
141
142fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
146 let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
147 public_key,
148 ac.get("userId").and_then(Value::as_str),
149 ac.get("signature").and_then(Value::as_str),
150 ac.get("timestamp").and_then(Value::as_i64),
151 ) else {
152 return false;
153 };
154 let now = chrono::Utc::now().timestamp();
155 smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
156}
157
158async fn handle_create_session(
162 state: &AppState,
163 conn_id: &str,
164 origin: Option<&str>,
165 parsed: &Value,
166 request_id: Option<&str>,
167 sink: &UnboundedSender<Value>,
168) {
169 let agent_id = parsed
170 .get("agentId")
171 .and_then(Value::as_str)
172 .map(str::to_string)
173 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
174
175 if !enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
179 return;
180 }
181
182 let user_name = parsed
183 .get("userName")
184 .and_then(Value::as_str)
185 .unwrap_or("Visitor")
186 .to_string();
187 let user_email = parsed
188 .get("userEmail")
189 .and_then(Value::as_str)
190 .map(str::to_string);
191 let browser_fingerprint = parsed
192 .get("browserFingerprint")
193 .and_then(Value::as_str)
194 .map(str::to_string);
195
196 let now = chrono::Utc::now();
197 let org_id = crate::server::SEED_ORG_ID.to_string();
201
202 let conversation_id = uuid::Uuid::new_v4().to_string();
203 let session_id = uuid::Uuid::new_v4().to_string();
204 let user_participant_id = uuid::Uuid::new_v4().to_string();
205 let agent_participant_id = uuid::Uuid::new_v4().to_string();
206
207 state
212 .backplane
213 .associate(
214 conn_id,
215 smooth_operator::backplane::Target::Session(session_id.clone()),
216 )
217 .await;
218 state
219 .backplane
220 .associate(
221 conn_id,
222 smooth_operator::backplane::Target::Agent(agent_id.clone()),
223 )
224 .await;
225
226 let conversation = Conversation {
227 id: conversation_id.clone(),
228 platform: Platform::Web,
229 name: format!("Session {session_id}"),
230 organization_id: org_id.clone(),
231 idempotency_key: session_id.clone(),
232 metadata_json: parsed.get("metadata").cloned(),
233 analytics_json: None,
234 created_at: now,
235 updated_at: now,
236 };
237
238 let user_participant = Participant {
239 id: user_participant_id.clone(),
240 conversation_id: conversation_id.clone(),
241 organization_id: org_id.clone(),
242 participant_type: ParticipantType::User,
243 external_id: None,
244 internal_id: None,
245 browser_fingerprint,
246 browser_info: None,
247 name: user_name,
248 email: user_email,
249 phone: None,
250 crm_contact_id: None,
251 metadata_json: None,
252 created_at: now,
253 updated_at: now,
254 };
255
256 let agent_participant = Participant {
257 id: agent_participant_id.clone(),
258 conversation_id: conversation_id.clone(),
259 organization_id: org_id.clone(),
260 participant_type: ParticipantType::AiAgent,
261 external_id: None,
262 internal_id: Some(agent_id.clone()),
263 browser_fingerprint: None,
264 browser_info: None,
265 name: AGENT_NAME.to_string(),
266 email: None,
267 phone: None,
268 crm_contact_id: None,
269 metadata_json: None,
270 created_at: now,
271 updated_at: now,
272 };
273
274 let session = Session {
275 session_id: session_id.clone(),
276 conversation_id: conversation_id.clone(),
277 agent_id: agent_id.clone(),
278 agent_name: AGENT_NAME.to_string(),
279 user_participant_id: user_participant_id.clone(),
280 agent_participant_id: agent_participant_id.clone(),
281 thread_id: conversation_id.clone(),
284 status: Some(SessionStatus::Active),
285 token_count: Some(0),
286 message_count: Some(0),
287 metadata: None,
288 created_at: Some(now),
289 updated_at: Some(now),
290 ended_at: None,
291 last_activity_at: Some(now),
292 };
293
294 let storage = state.storage.clone();
296 let sink_clone = sink.clone();
297 let request_id_owned = request_id.map(str::to_string);
298 let session_for_registry = session.clone();
299 let state_clone = state.clone();
300
301 let data = json!({
302 "sessionId": session_id,
303 "conversationId": conversation_id,
304 "agentId": agent_id,
305 "agentName": AGENT_NAME,
306 "userParticipantId": user_participant_id,
307 "agentParticipantId": agent_participant_id,
308 });
309
310 tokio::spawn(async move {
311 let rid = request_id_owned.as_deref();
312 if let Err(e) = storage.create_conversation(conversation).await {
313 let _ = sink_clone.send(protocol::error(
314 rid,
315 "INTERNAL_ERROR",
316 &format!("create conversation failed: {e}"),
317 ));
318 return;
319 }
320 if let Err(e) = storage.add_participant(user_participant).await {
321 let _ = sink_clone.send(protocol::error(
322 rid,
323 "INTERNAL_ERROR",
324 &format!("add user participant failed: {e}"),
325 ));
326 return;
327 }
328 if let Err(e) = storage.add_participant(agent_participant).await {
329 let _ = sink_clone.send(protocol::error(
330 rid,
331 "INTERNAL_ERROR",
332 &format!("add agent participant failed: {e}"),
333 ));
334 return;
335 }
336 if let Err(e) = storage.create_session(session).await {
337 let _ = sink_clone.send(protocol::error(
338 rid,
339 "INTERNAL_ERROR",
340 &format!("create session failed: {e}"),
341 ));
342 return;
343 }
344 state_clone.insert_session(session_for_registry);
345 let _ = sink_clone.send(protocol::immediate_response(
346 rid,
347 200,
348 "Session created",
349 data,
350 ));
351 });
352}
353
354fn handle_get_session(
356 state: &AppState,
357 parsed: &Value,
358 request_id: Option<&str>,
359 sink: &UnboundedSender<Value>,
360) {
361 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
362 let _ = sink.send(protocol::error(
363 request_id,
364 "VALIDATION_ERROR",
365 "missing 'sessionId'",
366 ));
367 return;
368 };
369
370 match state.get_session(session_id) {
371 Some(s) => {
372 let data = json!({
373 "sessionId": s.session_id,
374 "conversationId": s.conversation_id,
375 "agentId": s.agent_id,
376 "agentName": s.agent_name,
377 "userParticipantId": s.user_participant_id,
378 "agentParticipantId": s.agent_participant_id,
379 "threadId": s.thread_id,
380 "status": s.status.map_or("active", |st| match st {
381 SessionStatus::Active => "active",
382 SessionStatus::Idle => "idle",
383 SessionStatus::Ended => "ended",
384 }),
385 });
386 let _ = sink.send(protocol::immediate_response(
387 request_id, 200, "Session", data,
388 ));
389 }
390 None => {
391 let _ = sink.send(protocol::error(
392 request_id,
393 "SESSION_NOT_FOUND",
394 &format!("session '{session_id}' not found"),
395 ));
396 }
397 }
398}
399
400async fn handle_get_conversation_messages(
408 state: &AppState,
409 parsed: &Value,
410 request_id: Option<&str>,
411 sink: &UnboundedSender<Value>,
412) {
413 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
414 let _ = sink.send(protocol::error(
415 request_id,
416 "VALIDATION_ERROR",
417 "missing 'sessionId'",
418 ));
419 return;
420 };
421 let Some(session) = state.get_session(session_id) else {
422 let _ = sink.send(protocol::error(
423 request_id,
424 "SESSION_NOT_FOUND",
425 &format!("session '{session_id}' not found"),
426 ));
427 return;
428 };
429
430 const DEFAULT_LIMIT: usize = 50;
431 let limit = parsed
432 .get("limit")
433 .and_then(Value::as_u64)
434 .map(|n| n as usize)
435 .filter(|n| *n > 0)
436 .unwrap_or(DEFAULT_LIMIT);
437 let cursor = parsed
438 .get("cursor")
439 .and_then(Value::as_str)
440 .map(str::to_string);
441
442 let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
443 query.cursor = cursor;
444 query.descending = true;
445
446 match state.storage.list_messages_by_conversation(query).await {
447 Ok(page) => {
448 let data = json!({
449 "conversationId": session.conversation_id,
450 "messages": page.messages,
451 "nextCursor": page.next_cursor,
452 "hasMore": page.next_cursor.is_some(),
453 });
454 let _ = sink.send(protocol::immediate_response(
455 request_id,
456 200,
457 "ConversationMessages",
458 data,
459 ));
460 }
461 Err(e) => {
462 let _ = sink.send(protocol::error(
463 request_id,
464 "STORAGE_ERROR",
465 &format!("failed to list messages: {e}"),
466 ));
467 }
468 }
469}
470
471async fn handle_send_message(
476 state: &AppState,
477 access: &AccessContext,
478 parsed: &Value,
479 request_id: Option<&str>,
480 sink: &UnboundedSender<Value>,
481) {
482 let Some(request_id) = request_id else {
484 let _ = sink.send(protocol::error(
485 None,
486 "VALIDATION_ERROR",
487 "send_message requires a 'requestId'",
488 ));
489 return;
490 };
491
492 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
493 let _ = sink.send(protocol::error(
494 Some(request_id),
495 "VALIDATION_ERROR",
496 "missing 'sessionId'",
497 ));
498 return;
499 };
500
501 let message = match parsed.get("message").and_then(Value::as_str) {
502 Some(m) if !m.trim().is_empty() => m.to_string(),
503 _ => {
504 let _ = sink.send(protocol::error(
505 Some(request_id),
506 "VALIDATION_ERROR",
507 "missing or empty 'message'",
508 ));
509 return;
510 }
511 };
512
513 let Some(session) = state.get_session(session_id) else {
514 let _ = sink.send(protocol::error(
515 Some(request_id),
516 "SESSION_NOT_FOUND",
517 &format!("session '{session_id}' not found"),
518 ));
519 return;
520 };
521
522 let chat_provider = state.chat_provider.clone();
528
529 let org_id = match state
536 .storage
537 .get_conversation(&session.conversation_id)
538 .await
539 {
540 Ok(Some(conversation)) => conversation.organization_id,
541 Ok(None) | Err(_) => String::new(),
544 };
545 let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
546 &state.gateway_key_resolver,
547 &org_id,
548 state.config.gateway_key.as_deref(),
549 )
550 .await;
551
552 let llm = match resolved_key {
557 Some(key) => state.config.llm_config_with_key(key),
558 None if chat_provider.is_some() => state.config.placeholder_llm_config(),
559 None => {
560 let _ = sink.send(protocol::error(
561 Some(request_id),
562 "LLM_UNAVAILABLE",
563 "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
564 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
565 key to enable send_message.",
566 ));
567 return;
568 }
569 };
570
571 let _ = sink.send(protocol::immediate_response(
573 Some(request_id),
574 202,
575 "Processing your request...",
576 json!({}),
577 ));
578
579 let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
587 crate::runner::ConfirmationConfig {
588 tool_patterns: patterns,
589 session_id: session.session_id.clone(),
590 register: {
591 let state = state.clone();
592 Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
593 },
594 clear: {
595 let state = state.clone();
596 Arc::new(move |sid: &str| state.clear_confirmation(sid))
597 },
598 }
599 });
600
601 let org_id = crate::server::SEED_ORG_ID.to_string();
605 let system_prompt = state.settings.get(&org_id).persona;
609 let tool_provider = state.tool_provider.clone();
611
612 let state_for_turn = state.clone();
613 let access_owned = access.clone();
614 let sink_owned = sink.clone();
615 let request_id_owned = request_id.to_string();
616 let conversation_id = session.conversation_id.clone();
617
618 tokio::spawn(async move {
619 let result = runner::run_streaming_turn(
620 TurnRequest {
621 storage: state_for_turn.storage.clone(),
622 llm,
623 max_iterations: state_for_turn.config.max_iterations,
624 conversation_id: &conversation_id,
625 request_id: &request_id_owned,
626 user_message: &message,
627 access: access_owned,
631 llm_provider: chat_provider,
634 reranker: crate::reranker::build_reranker(
638 &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
639 ),
640 confirmation,
641 tool_provider,
643 system_prompt,
645 org_id: Some(org_id),
646 },
647 &sink_owned,
648 )
649 .await;
650
651 match result {
652 Ok(turn) => {
653 let response = runner::general_agent_response(&turn.reply);
654 let _ = sink_owned.send(protocol::eventual_response(
655 &request_id_owned,
656 200,
657 &turn.message_id,
658 response,
659 false,
660 &turn.citations,
661 ));
662 }
663 Err(e) => {
664 let _ = sink_owned.send(protocol::error(
665 Some(&request_id_owned),
666 "AGENT_ERROR",
667 &format!("agent turn failed: {e}"),
668 ));
669 }
670 }
671 });
672}
673
674fn handle_confirm_tool_action(
688 state: &AppState,
689 parsed: &Value,
690 request_id: Option<&str>,
691 sink: &UnboundedSender<Value>,
692) {
693 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
694 let _ = sink.send(protocol::error(
695 request_id,
696 "VALIDATION_ERROR",
697 "confirm_tool_action requires a 'sessionId'",
698 ));
699 return;
700 };
701
702 let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
705 let _ = sink.send(protocol::error(
706 request_id,
707 "VALIDATION_ERROR",
708 "confirm_tool_action requires a boolean 'approved'",
709 ));
710 return;
711 };
712
713 let Some(responder) = state.take_confirmation(session_id) else {
714 let _ = sink.send(protocol::error(
715 request_id,
716 "NO_PENDING_CONFIRMATION",
717 &format!("no tool action is awaiting confirmation for session '{session_id}'"),
718 ));
719 return;
720 };
721
722 let verdict = if approved {
723 smooth_operator_core::HumanResponse::Approved
724 } else {
725 smooth_operator_core::HumanResponse::Denied {
726 reason: "user rejected the action".to_string(),
727 }
728 };
729
730 if responder.send(verdict).is_err() {
731 let _ = sink.send(protocol::error(
733 request_id,
734 "NO_PENDING_CONFIRMATION",
735 &format!(
736 "the turn awaiting confirmation for session '{session_id}' is no longer active"
737 ),
738 ));
739 return;
740 }
741
742 let _ = sink.send(protocol::immediate_response(
744 request_id,
745 200,
746 if approved {
747 "Tool action approved"
748 } else {
749 "Tool action rejected"
750 },
751 json!({ "sessionId": session_id, "approved": approved }),
752 ));
753}