1use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::domain::{
17 Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
18};
19
20use crate::protocol;
21use crate::runner;
22use crate::runner::TurnRequest;
23use crate::state::AppState;
24
25const AGENT_NAME: &str = "smooth-agent";
27
28pub async fn handle_frame(
32 state: &AppState,
33 access: &AccessContext,
34 conn_id: &str,
35 origin: Option<&str>,
36 auth_org: Option<&str>,
37 raw: &str,
38 sink: &UnboundedSender<Value>,
39) {
40 let parsed: Value = match serde_json::from_str(raw) {
41 Ok(v) => v,
42 Err(e) => {
43 let _ = sink.send(protocol::error(
44 None,
45 "VALIDATION_ERROR",
46 &format!("invalid JSON frame: {e}"),
47 ));
48 return;
49 }
50 };
51
52 let action = parsed.get("action").and_then(Value::as_str);
53 let request_id = parsed.get("requestId").and_then(Value::as_str);
54
55 match action {
56 Some("ping") => {
57 let _ = sink.send(protocol::pong(request_id));
58 }
59 Some("create_conversation_session") => {
60 handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
61 .await;
62 }
63 Some("get_session") => {
64 handle_get_session(state, &parsed, request_id, sink);
65 }
66 Some("get_conversation_messages") => {
67 handle_get_conversation_messages(state, &parsed, request_id, sink).await;
68 }
69 Some("send_message") => {
70 handle_send_message(state, access, &parsed, request_id, sink).await;
71 }
72 Some("confirm_tool_action") => {
73 handle_confirm_tool_action(state, &parsed, request_id, sink);
74 }
75 Some(other) => {
76 let _ = sink.send(protocol::error(
77 request_id,
78 "UNSUPPORTED_ACTION",
79 &format!("action '{other}' is not supported by this server"),
80 ));
81 }
82 None => {
83 let _ = sink.send(protocol::error(
84 request_id,
85 "VALIDATION_ERROR",
86 "missing 'action' field",
87 ));
88 }
89 }
90}
91
92enum WidgetAuthOutcome {
95 Denied,
97 Allowed { org_id: Option<String> },
102}
103
104async fn enforce_widget_auth(
111 state: &AppState,
112 origin: Option<&str>,
113 agent_id: &str,
114 parsed: &Value,
115 request_id: Option<&str>,
116 sink: &UnboundedSender<Value>,
117) -> WidgetAuthOutcome {
118 let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
119 if state.config.widget_auth_strict {
120 let _ = sink.send(protocol::error(
121 request_id,
122 "AGENT_NOT_AUTHORIZED",
123 "this agent is not registered for embedding",
124 ));
125 return WidgetAuthOutcome::Denied;
126 }
127 return WidgetAuthOutcome::Allowed { org_id: None };
128 };
129
130 if !smooth_operator::widget_auth::origin_allowed(
132 &policy.allowed_origins,
133 origin.unwrap_or_default(),
134 ) {
135 let _ = sink.send(protocol::error(
136 request_id,
137 "ORIGIN_NOT_ALLOWED",
138 "this origin is not allowed to embed this agent",
139 ));
140 return WidgetAuthOutcome::Denied;
141 }
142
143 if let Some(ac) = parsed.get("authContext") {
145 if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
146 let _ = sink.send(protocol::error(
147 request_id,
148 "AUTH_CONTEXT_INVALID",
149 "authContext signature failed verification",
150 ));
151 return WidgetAuthOutcome::Denied;
152 }
153 }
154 WidgetAuthOutcome::Allowed {
155 org_id: policy.organization_id,
156 }
157}
158
159fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
163 let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
164 public_key,
165 ac.get("userId").and_then(Value::as_str),
166 ac.get("signature").and_then(Value::as_str),
167 ac.get("timestamp").and_then(Value::as_i64),
168 ) else {
169 return false;
170 };
171 let now = chrono::Utc::now().timestamp();
172 smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
173}
174
175async fn handle_create_session(
179 state: &AppState,
180 conn_id: &str,
181 origin: Option<&str>,
182 auth_org: Option<&str>,
183 parsed: &Value,
184 request_id: Option<&str>,
185 sink: &UnboundedSender<Value>,
186) {
187 let agent_id = parsed
188 .get("agentId")
189 .and_then(Value::as_str)
190 .map(str::to_string)
191 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
192
193 let widget_org =
198 match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
199 WidgetAuthOutcome::Denied => return,
200 WidgetAuthOutcome::Allowed { org_id } => org_id,
201 };
202
203 let user_name = parsed
204 .get("userName")
205 .and_then(Value::as_str)
206 .unwrap_or("Visitor")
207 .to_string();
208 let user_email = parsed
209 .get("userEmail")
210 .and_then(Value::as_str)
211 .map(str::to_string);
212 let browser_fingerprint = parsed
213 .get("browserFingerprint")
214 .and_then(Value::as_str)
215 .map(str::to_string);
216
217 let now = chrono::Utc::now();
218 let org_id = widget_org
230 .or_else(|| auth_org.map(str::to_string))
231 .unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
232
233 let conversation_id = uuid::Uuid::new_v4().to_string();
234 let session_id = uuid::Uuid::new_v4().to_string();
235 let user_participant_id = uuid::Uuid::new_v4().to_string();
236 let agent_participant_id = uuid::Uuid::new_v4().to_string();
237
238 state
243 .backplane
244 .associate(
245 conn_id,
246 smooth_operator::backplane::Target::Session(session_id.clone()),
247 )
248 .await;
249 state
250 .backplane
251 .associate(
252 conn_id,
253 smooth_operator::backplane::Target::Agent(agent_id.clone()),
254 )
255 .await;
256
257 let conversation = Conversation {
258 id: conversation_id.clone(),
259 platform: Platform::Web,
260 name: format!("Session {session_id}"),
261 organization_id: org_id.clone(),
262 idempotency_key: session_id.clone(),
263 metadata_json: parsed.get("metadata").cloned(),
264 analytics_json: None,
265 created_at: now,
266 updated_at: now,
267 };
268
269 let user_participant = Participant {
270 id: user_participant_id.clone(),
271 conversation_id: conversation_id.clone(),
272 organization_id: org_id.clone(),
273 participant_type: ParticipantType::User,
274 external_id: None,
275 internal_id: None,
276 browser_fingerprint,
277 browser_info: None,
278 name: user_name,
279 email: user_email,
280 phone: None,
281 crm_contact_id: None,
282 metadata_json: None,
283 created_at: now,
284 updated_at: now,
285 };
286
287 let agent_participant = Participant {
288 id: agent_participant_id.clone(),
289 conversation_id: conversation_id.clone(),
290 organization_id: org_id.clone(),
291 participant_type: ParticipantType::AiAgent,
292 external_id: None,
293 internal_id: Some(agent_id.clone()),
294 browser_fingerprint: None,
295 browser_info: None,
296 name: AGENT_NAME.to_string(),
297 email: None,
298 phone: None,
299 crm_contact_id: None,
300 metadata_json: None,
301 created_at: now,
302 updated_at: now,
303 };
304
305 let session = Session {
306 session_id: session_id.clone(),
307 conversation_id: conversation_id.clone(),
308 organization_id: org_id.clone(),
309 agent_id: agent_id.clone(),
310 agent_name: AGENT_NAME.to_string(),
311 user_participant_id: user_participant_id.clone(),
312 agent_participant_id: agent_participant_id.clone(),
313 thread_id: conversation_id.clone(),
316 status: Some(SessionStatus::Active),
317 token_count: Some(0),
318 message_count: Some(0),
319 metadata: None,
320 created_at: Some(now),
321 updated_at: Some(now),
322 ended_at: None,
323 last_activity_at: Some(now),
324 };
325
326 let storage = state.storage.clone();
328 let sink_clone = sink.clone();
329 let request_id_owned = request_id.map(str::to_string);
330 let session_for_registry = session.clone();
331 let state_clone = state.clone();
332
333 let data = json!({
334 "sessionId": session_id,
335 "conversationId": conversation_id,
336 "agentId": agent_id,
337 "agentName": AGENT_NAME,
338 "userParticipantId": user_participant_id,
339 "agentParticipantId": agent_participant_id,
340 });
341
342 tokio::spawn(async move {
343 let rid = request_id_owned.as_deref();
344 if let Err(e) = storage.create_conversation(conversation).await {
345 let _ = sink_clone.send(protocol::error(
346 rid,
347 "INTERNAL_ERROR",
348 &format!("create conversation failed: {e}"),
349 ));
350 return;
351 }
352 if let Err(e) = storage.add_participant(user_participant).await {
353 let _ = sink_clone.send(protocol::error(
354 rid,
355 "INTERNAL_ERROR",
356 &format!("add user participant failed: {e}"),
357 ));
358 return;
359 }
360 if let Err(e) = storage.add_participant(agent_participant).await {
361 let _ = sink_clone.send(protocol::error(
362 rid,
363 "INTERNAL_ERROR",
364 &format!("add agent participant failed: {e}"),
365 ));
366 return;
367 }
368 if let Err(e) = storage.create_session(session).await {
369 let _ = sink_clone.send(protocol::error(
370 rid,
371 "INTERNAL_ERROR",
372 &format!("create session failed: {e}"),
373 ));
374 return;
375 }
376 state_clone.insert_session(session_for_registry);
377 let _ = sink_clone.send(protocol::immediate_response(
378 rid,
379 200,
380 "Session created",
381 data,
382 ));
383 });
384}
385
386fn handle_get_session(
388 state: &AppState,
389 parsed: &Value,
390 request_id: Option<&str>,
391 sink: &UnboundedSender<Value>,
392) {
393 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
394 let _ = sink.send(protocol::error(
395 request_id,
396 "VALIDATION_ERROR",
397 "missing 'sessionId'",
398 ));
399 return;
400 };
401
402 match state.get_session(session_id) {
403 Some(s) => {
404 let data = json!({
405 "sessionId": s.session_id,
406 "conversationId": s.conversation_id,
407 "agentId": s.agent_id,
408 "agentName": s.agent_name,
409 "userParticipantId": s.user_participant_id,
410 "agentParticipantId": s.agent_participant_id,
411 "threadId": s.thread_id,
412 "status": s.status.map_or("active", |st| match st {
413 SessionStatus::Active => "active",
414 SessionStatus::Idle => "idle",
415 SessionStatus::Ended => "ended",
416 }),
417 });
418 let _ = sink.send(protocol::immediate_response(
419 request_id, 200, "Session", data,
420 ));
421 }
422 None => {
423 let _ = sink.send(protocol::error(
424 request_id,
425 "SESSION_NOT_FOUND",
426 &format!("session '{session_id}' not found"),
427 ));
428 }
429 }
430}
431
432async fn handle_get_conversation_messages(
440 state: &AppState,
441 parsed: &Value,
442 request_id: Option<&str>,
443 sink: &UnboundedSender<Value>,
444) {
445 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
446 let _ = sink.send(protocol::error(
447 request_id,
448 "VALIDATION_ERROR",
449 "missing 'sessionId'",
450 ));
451 return;
452 };
453 let Some(session) = state.get_session(session_id) else {
454 let _ = sink.send(protocol::error(
455 request_id,
456 "SESSION_NOT_FOUND",
457 &format!("session '{session_id}' not found"),
458 ));
459 return;
460 };
461
462 const DEFAULT_LIMIT: usize = 50;
463 let limit = parsed
464 .get("limit")
465 .and_then(Value::as_u64)
466 .map(|n| n as usize)
467 .filter(|n| *n > 0)
468 .unwrap_or(DEFAULT_LIMIT);
469 let cursor = parsed
470 .get("cursor")
471 .and_then(Value::as_str)
472 .map(str::to_string);
473
474 let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
475 query.cursor = cursor;
476 query.descending = true;
477
478 match state.storage.list_messages_by_conversation(query).await {
479 Ok(page) => {
480 let data = json!({
481 "conversationId": session.conversation_id,
482 "messages": page.messages,
483 "nextCursor": page.next_cursor,
484 "hasMore": page.next_cursor.is_some(),
485 });
486 let _ = sink.send(protocol::immediate_response(
487 request_id,
488 200,
489 "ConversationMessages",
490 data,
491 ));
492 }
493 Err(e) => {
494 let _ = sink.send(protocol::error(
495 request_id,
496 "STORAGE_ERROR",
497 &format!("failed to list messages: {e}"),
498 ));
499 }
500 }
501}
502
503async fn handle_send_message(
508 state: &AppState,
509 access: &AccessContext,
510 parsed: &Value,
511 request_id: Option<&str>,
512 sink: &UnboundedSender<Value>,
513) {
514 let Some(request_id) = request_id else {
516 let _ = sink.send(protocol::error(
517 None,
518 "VALIDATION_ERROR",
519 "send_message requires a 'requestId'",
520 ));
521 return;
522 };
523
524 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
525 let _ = sink.send(protocol::error(
526 Some(request_id),
527 "VALIDATION_ERROR",
528 "missing 'sessionId'",
529 ));
530 return;
531 };
532
533 let message = match parsed.get("message").and_then(Value::as_str) {
534 Some(m) if !m.trim().is_empty() => m.to_string(),
535 _ => {
536 let _ = sink.send(protocol::error(
537 Some(request_id),
538 "VALIDATION_ERROR",
539 "missing or empty 'message'",
540 ));
541 return;
542 }
543 };
544
545 let Some(session) = state.get_session(session_id) else {
546 let _ = sink.send(protocol::error(
547 Some(request_id),
548 "SESSION_NOT_FOUND",
549 &format!("session '{session_id}' not found"),
550 ));
551 return;
552 };
553
554 let chat_provider = state.chat_provider.clone();
560
561 let org_id = match state
568 .storage
569 .get_conversation(&session.conversation_id)
570 .await
571 {
572 Ok(Some(conversation)) => conversation.organization_id,
573 Ok(None) | Err(_) => String::new(),
576 };
577 let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
578 &state.gateway_key_resolver,
579 &org_id,
580 state.config.gateway_key.as_deref(),
581 )
582 .await;
583
584 let turn_gateway_key = resolved_key.clone();
592 let llm = match resolved_key {
593 Some(key) => state.config.llm_config_with_key(key),
594 None if chat_provider.is_some() => state.config.placeholder_llm_config(),
595 None => {
596 let _ = sink.send(protocol::error(
597 Some(request_id),
598 "LLM_UNAVAILABLE",
599 "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
600 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
601 key to enable send_message.",
602 ));
603 return;
604 }
605 };
606
607 let _ = sink.send(protocol::immediate_response(
609 Some(request_id),
610 202,
611 "Processing your request...",
612 json!({}),
613 ));
614
615 let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
623 crate::runner::ConfirmationConfig {
624 tool_patterns: patterns,
625 session_id: session.session_id.clone(),
626 register: {
627 let state = state.clone();
628 Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
629 },
630 clear: {
631 let state = state.clone();
632 Arc::new(move |sid: &str| state.clear_confirmation(sid))
633 },
634 }
635 });
636
637 let org_id = crate::server::SEED_ORG_ID.to_string();
641 let system_prompt = state.settings.get(&org_id).persona;
645 let tool_provider = state.tool_provider.clone();
647
648 let state_for_turn = state.clone();
649 let access_owned = access.clone();
650 let sink_owned = sink.clone();
651 let request_id_owned = request_id.to_string();
652 let conversation_id = session.conversation_id.clone();
653
654 tokio::spawn(async move {
655 let result = runner::run_streaming_turn(
656 TurnRequest {
657 storage: state_for_turn.storage.clone(),
658 llm,
659 max_iterations: state_for_turn.config.max_iterations,
660 conversation_id: &conversation_id,
661 request_id: &request_id_owned,
662 user_message: &message,
663 access: access_owned,
667 llm_provider: chat_provider,
670 reranker: crate::reranker::build_reranker(
674 &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
675 ),
676 confirmation,
677 tool_provider,
679 system_prompt,
681 org_id: Some(org_id),
682 gateway_key: turn_gateway_key,
685 },
686 &sink_owned,
687 )
688 .await;
689
690 match result {
691 Ok(turn) => {
692 let response = runner::general_agent_response(&turn.reply);
693 let _ = sink_owned.send(protocol::eventual_response(
694 &request_id_owned,
695 200,
696 &turn.message_id,
697 response,
698 false,
699 &turn.citations,
700 ));
701 }
702 Err(e) => {
703 let _ = sink_owned.send(protocol::error(
704 Some(&request_id_owned),
705 "AGENT_ERROR",
706 &format!("agent turn failed: {e}"),
707 ));
708 }
709 }
710 });
711}
712
713fn handle_confirm_tool_action(
727 state: &AppState,
728 parsed: &Value,
729 request_id: Option<&str>,
730 sink: &UnboundedSender<Value>,
731) {
732 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
733 let _ = sink.send(protocol::error(
734 request_id,
735 "VALIDATION_ERROR",
736 "confirm_tool_action requires a 'sessionId'",
737 ));
738 return;
739 };
740
741 let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
744 let _ = sink.send(protocol::error(
745 request_id,
746 "VALIDATION_ERROR",
747 "confirm_tool_action requires a boolean 'approved'",
748 ));
749 return;
750 };
751
752 let Some(responder) = state.take_confirmation(session_id) else {
753 let _ = sink.send(protocol::error(
754 request_id,
755 "NO_PENDING_CONFIRMATION",
756 &format!("no tool action is awaiting confirmation for session '{session_id}'"),
757 ));
758 return;
759 };
760
761 let verdict = if approved {
762 smooth_operator_core::HumanResponse::Approved
763 } else {
764 smooth_operator_core::HumanResponse::Denied {
765 reason: "user rejected the action".to_string(),
766 }
767 };
768
769 if responder.send(verdict).is_err() {
770 let _ = sink.send(protocol::error(
772 request_id,
773 "NO_PENDING_CONFIRMATION",
774 &format!(
775 "the turn awaiting confirmation for session '{session_id}' is no longer active"
776 ),
777 ));
778 return;
779 }
780
781 let _ = sink.send(protocol::immediate_response(
783 request_id,
784 200,
785 if approved {
786 "Tool action approved"
787 } else {
788 "Tool action rejected"
789 },
790 json!({ "sessionId": session_id, "approved": approved }),
791 ));
792}