1use std::sync::Arc;
11
12use serde_json::{json, Value};
13use tokio::sync::mpsc::UnboundedSender;
14
15use smooth_operator::access_control::AccessContext;
16use smooth_operator::agent_config::{AgentBehaviorConfig, AuthGateHook, AuthLevel};
17use smooth_operator::domain::{
18 Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
19};
20use smooth_operator_core::llm_provider::LlmProvider;
21use smooth_operator_core::{LlmClient, LlmConfig};
22
23use crate::protocol;
24use crate::runner;
25use crate::runner::TurnRequest;
26use crate::state::AppState;
27
28const AGENT_NAME: &str = "smooth-agent";
30
31pub async fn handle_frame(
35 state: &AppState,
36 access: &AccessContext,
37 conn_id: &str,
38 origin: Option<&str>,
39 auth_org: Option<&str>,
40 raw: &str,
41 sink: &UnboundedSender<Value>,
42) {
43 let parsed: Value = match serde_json::from_str(raw) {
44 Ok(v) => v,
45 Err(e) => {
46 let _ = sink.send(protocol::error(
47 None,
48 "VALIDATION_ERROR",
49 &format!("invalid JSON frame: {e}"),
50 ));
51 return;
52 }
53 };
54
55 let action = parsed.get("action").and_then(Value::as_str);
56 let request_id = parsed.get("requestId").and_then(Value::as_str);
57
58 match action {
59 Some("ping") => {
60 let _ = sink.send(protocol::pong(request_id));
61 }
62 Some("create_conversation_session") => {
63 handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
64 .await;
65 }
66 Some("get_session") => {
67 handle_get_session(state, &parsed, request_id, sink);
68 }
69 Some("get_conversation_messages") => {
70 handle_get_conversation_messages(state, &parsed, request_id, sink).await;
71 }
72 Some("send_message") => {
73 handle_send_message(state, access, &parsed, request_id, sink).await;
74 }
75 Some("confirm_tool_action") => {
76 handle_confirm_tool_action(state, &parsed, request_id, sink);
77 }
78 Some("verify_otp") => {
79 handle_verify_otp(state, &parsed, request_id, sink).await;
80 }
81 Some(other) => {
82 let _ = sink.send(protocol::error(
83 request_id,
84 "UNSUPPORTED_ACTION",
85 &format!("action '{other}' is not supported by this server"),
86 ));
87 }
88 None => {
89 let _ = sink.send(protocol::error(
90 request_id,
91 "VALIDATION_ERROR",
92 "missing 'action' field",
93 ));
94 }
95 }
96}
97
98enum WidgetAuthOutcome {
101 Denied,
103 Allowed { org_id: Option<String> },
108}
109
110async fn enforce_widget_auth(
117 state: &AppState,
118 origin: Option<&str>,
119 agent_id: &str,
120 parsed: &Value,
121 request_id: Option<&str>,
122 sink: &UnboundedSender<Value>,
123) -> WidgetAuthOutcome {
124 let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
125 if state.config.widget_auth_strict {
126 let _ = sink.send(protocol::error(
127 request_id,
128 "AGENT_NOT_AUTHORIZED",
129 "this agent is not registered for embedding",
130 ));
131 return WidgetAuthOutcome::Denied;
132 }
133 return WidgetAuthOutcome::Allowed { org_id: None };
134 };
135
136 if !smooth_operator::widget_auth::origin_allowed(
138 &policy.allowed_origins,
139 origin.unwrap_or_default(),
140 ) {
141 let _ = sink.send(protocol::error(
142 request_id,
143 "ORIGIN_NOT_ALLOWED",
144 "this origin is not allowed to embed this agent",
145 ));
146 return WidgetAuthOutcome::Denied;
147 }
148
149 if let Some(ac) = parsed.get("authContext") {
151 if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
152 let _ = sink.send(protocol::error(
153 request_id,
154 "AUTH_CONTEXT_INVALID",
155 "authContext signature failed verification",
156 ));
157 return WidgetAuthOutcome::Denied;
158 }
159 }
160 WidgetAuthOutcome::Allowed {
161 org_id: policy.organization_id,
162 }
163}
164
165fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
169 let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
170 public_key,
171 ac.get("userId").and_then(Value::as_str),
172 ac.get("signature").and_then(Value::as_str),
173 ac.get("timestamp").and_then(Value::as_i64),
174 ) else {
175 return false;
176 };
177 let now = chrono::Utc::now().timestamp();
178 smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
179}
180
181async fn handle_create_session(
185 state: &AppState,
186 conn_id: &str,
187 origin: Option<&str>,
188 auth_org: Option<&str>,
189 parsed: &Value,
190 request_id: Option<&str>,
191 sink: &UnboundedSender<Value>,
192) {
193 let agent_id = parsed
194 .get("agentId")
195 .and_then(Value::as_str)
196 .map(str::to_string)
197 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
198
199 let widget_org =
204 match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
205 WidgetAuthOutcome::Denied => return,
206 WidgetAuthOutcome::Allowed { org_id } => org_id,
207 };
208
209 let user_name = parsed
210 .get("userName")
211 .and_then(Value::as_str)
212 .unwrap_or("Visitor")
213 .to_string();
214 let user_email = parsed
215 .get("userEmail")
216 .and_then(Value::as_str)
217 .map(str::to_string);
218 let browser_fingerprint = parsed
219 .get("browserFingerprint")
220 .and_then(Value::as_str)
221 .map(str::to_string);
222
223 let now = chrono::Utc::now();
224 let org_id = widget_org
236 .or_else(|| auth_org.map(str::to_string))
237 .unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
238
239 let conversation_id = uuid::Uuid::new_v4().to_string();
240 let session_id = uuid::Uuid::new_v4().to_string();
241 let user_participant_id = uuid::Uuid::new_v4().to_string();
242 let agent_participant_id = uuid::Uuid::new_v4().to_string();
243
244 state
249 .backplane
250 .associate(
251 conn_id,
252 smooth_operator::backplane::Target::Session(session_id.clone()),
253 )
254 .await;
255 state
256 .backplane
257 .associate(
258 conn_id,
259 smooth_operator::backplane::Target::Agent(agent_id.clone()),
260 )
261 .await;
262
263 let conversation = Conversation {
264 id: conversation_id.clone(),
265 platform: Platform::Web,
266 name: format!("Session {session_id}"),
267 organization_id: org_id.clone(),
268 idempotency_key: session_id.clone(),
269 metadata_json: parsed.get("metadata").cloned(),
270 analytics_json: None,
271 created_at: now,
272 updated_at: now,
273 };
274
275 let user_participant = Participant {
276 id: user_participant_id.clone(),
277 conversation_id: conversation_id.clone(),
278 organization_id: org_id.clone(),
279 participant_type: ParticipantType::User,
280 external_id: None,
281 internal_id: None,
282 browser_fingerprint,
283 browser_info: None,
284 name: user_name,
285 email: user_email.clone(),
286 phone: None,
287 crm_contact_id: None,
288 metadata_json: None,
289 created_at: now,
290 updated_at: now,
291 };
292
293 let agent_participant = Participant {
294 id: agent_participant_id.clone(),
295 conversation_id: conversation_id.clone(),
296 organization_id: org_id.clone(),
297 participant_type: ParticipantType::AiAgent,
298 external_id: None,
299 internal_id: Some(agent_id.clone()),
300 browser_fingerprint: None,
301 browser_info: None,
302 name: AGENT_NAME.to_string(),
303 email: None,
304 phone: None,
305 crm_contact_id: None,
306 metadata_json: None,
307 created_at: now,
308 updated_at: now,
309 };
310
311 let session_metadata = user_email.as_ref().map(|email| {
317 let mut meta = std::collections::HashMap::new();
318 meta.insert("contactEmail".to_string(), Value::from(email.clone()));
319 meta
320 });
321
322 let session = Session {
323 session_id: session_id.clone(),
324 conversation_id: conversation_id.clone(),
325 organization_id: org_id.clone(),
326 agent_id: agent_id.clone(),
327 agent_name: AGENT_NAME.to_string(),
328 user_participant_id: user_participant_id.clone(),
329 agent_participant_id: agent_participant_id.clone(),
330 thread_id: conversation_id.clone(),
333 status: Some(SessionStatus::Active),
334 token_count: Some(0),
335 message_count: Some(0),
336 metadata: session_metadata,
337 created_at: Some(now),
338 updated_at: Some(now),
339 ended_at: None,
340 last_activity_at: Some(now),
341 };
342
343 let storage = state.storage.clone();
345 let sink_clone = sink.clone();
346 let request_id_owned = request_id.map(str::to_string);
347 let session_for_registry = session.clone();
348 let state_clone = state.clone();
349
350 let data = json!({
351 "sessionId": session_id,
352 "conversationId": conversation_id,
353 "agentId": agent_id,
354 "agentName": AGENT_NAME,
355 "userParticipantId": user_participant_id,
356 "agentParticipantId": agent_participant_id,
357 });
358
359 tokio::spawn(async move {
360 let rid = request_id_owned.as_deref();
361 if let Err(e) = storage.create_conversation(conversation).await {
362 let _ = sink_clone.send(protocol::error(
363 rid,
364 "INTERNAL_ERROR",
365 &format!("create conversation failed: {e}"),
366 ));
367 return;
368 }
369 if let Err(e) = storage.add_participant(user_participant).await {
370 let _ = sink_clone.send(protocol::error(
371 rid,
372 "INTERNAL_ERROR",
373 &format!("add user participant failed: {e}"),
374 ));
375 return;
376 }
377 if let Err(e) = storage.add_participant(agent_participant).await {
378 let _ = sink_clone.send(protocol::error(
379 rid,
380 "INTERNAL_ERROR",
381 &format!("add agent participant failed: {e}"),
382 ));
383 return;
384 }
385 if let Err(e) = storage.create_session(session).await {
386 let _ = sink_clone.send(protocol::error(
387 rid,
388 "INTERNAL_ERROR",
389 &format!("create session failed: {e}"),
390 ));
391 return;
392 }
393 state_clone.insert_session(session_for_registry);
394 let _ = sink_clone.send(protocol::immediate_response(
395 rid,
396 200,
397 "Session created",
398 data,
399 ));
400 });
401}
402
403fn handle_get_session(
405 state: &AppState,
406 parsed: &Value,
407 request_id: Option<&str>,
408 sink: &UnboundedSender<Value>,
409) {
410 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
411 let _ = sink.send(protocol::error(
412 request_id,
413 "VALIDATION_ERROR",
414 "missing 'sessionId'",
415 ));
416 return;
417 };
418
419 match state.get_session(session_id) {
420 Some(s) => {
421 let data = json!({
422 "sessionId": s.session_id,
423 "conversationId": s.conversation_id,
424 "agentId": s.agent_id,
425 "agentName": s.agent_name,
426 "userParticipantId": s.user_participant_id,
427 "agentParticipantId": s.agent_participant_id,
428 "threadId": s.thread_id,
429 "status": s.status.map_or("active", |st| match st {
430 SessionStatus::Active => "active",
431 SessionStatus::Idle => "idle",
432 SessionStatus::Ended => "ended",
433 }),
434 });
435 let _ = sink.send(protocol::immediate_response(
436 request_id, 200, "Session", data,
437 ));
438 }
439 None => {
440 let _ = sink.send(protocol::error(
441 request_id,
442 "SESSION_NOT_FOUND",
443 &format!("session '{session_id}' not found"),
444 ));
445 }
446 }
447}
448
449async fn handle_get_conversation_messages(
457 state: &AppState,
458 parsed: &Value,
459 request_id: Option<&str>,
460 sink: &UnboundedSender<Value>,
461) {
462 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
463 let _ = sink.send(protocol::error(
464 request_id,
465 "VALIDATION_ERROR",
466 "missing 'sessionId'",
467 ));
468 return;
469 };
470 let Some(session) = state.get_session(session_id) else {
471 let _ = sink.send(protocol::error(
472 request_id,
473 "SESSION_NOT_FOUND",
474 &format!("session '{session_id}' not found"),
475 ));
476 return;
477 };
478
479 const DEFAULT_LIMIT: usize = 50;
480 let limit = parsed
481 .get("limit")
482 .and_then(Value::as_u64)
483 .map(|n| n as usize)
484 .filter(|n| *n > 0)
485 .unwrap_or(DEFAULT_LIMIT);
486 let cursor = parsed
487 .get("cursor")
488 .and_then(Value::as_str)
489 .map(str::to_string);
490
491 let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
492 query.cursor = cursor;
493 query.descending = true;
494
495 match state.storage.list_messages_by_conversation(query).await {
496 Ok(page) => {
497 let data = json!({
498 "conversationId": session.conversation_id,
499 "messages": page.messages,
500 "nextCursor": page.next_cursor,
501 "hasMore": page.next_cursor.is_some(),
502 });
503 let _ = sink.send(protocol::immediate_response(
504 request_id,
505 200,
506 "ConversationMessages",
507 data,
508 ));
509 }
510 Err(e) => {
511 let _ = sink.send(protocol::error(
512 request_id,
513 "STORAGE_ERROR",
514 &format!("failed to list messages: {e}"),
515 ));
516 }
517 }
518}
519
520async fn handle_send_message(
525 state: &AppState,
526 access: &AccessContext,
527 parsed: &Value,
528 request_id: Option<&str>,
529 sink: &UnboundedSender<Value>,
530) {
531 let Some(request_id) = request_id else {
533 let _ = sink.send(protocol::error(
534 None,
535 "VALIDATION_ERROR",
536 "send_message requires a 'requestId'",
537 ));
538 return;
539 };
540
541 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
542 let _ = sink.send(protocol::error(
543 Some(request_id),
544 "VALIDATION_ERROR",
545 "missing 'sessionId'",
546 ));
547 return;
548 };
549
550 let message = match parsed.get("message").and_then(Value::as_str) {
551 Some(m) if !m.trim().is_empty() => m.to_string(),
552 _ => {
553 let _ = sink.send(protocol::error(
554 Some(request_id),
555 "VALIDATION_ERROR",
556 "missing or empty 'message'",
557 ));
558 return;
559 }
560 };
561
562 let Some(session) = state.get_session(session_id) else {
563 let _ = sink.send(protocol::error(
564 Some(request_id),
565 "SESSION_NOT_FOUND",
566 &format!("session '{session_id}' not found"),
567 ));
568 return;
569 };
570
571 let chat_provider = state.chat_provider.clone();
577
578 let org_id = match state
585 .storage
586 .get_conversation(&session.conversation_id)
587 .await
588 {
589 Ok(Some(conversation)) => conversation.organization_id,
590 Ok(None) | Err(_) => String::new(),
593 };
594 let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
595 &state.gateway_key_resolver,
596 &org_id,
597 state.config.gateway_key.as_deref(),
598 )
599 .await;
600
601 let turn_gateway_key = resolved_key.clone();
609 let llm = match resolved_key {
610 Some(key) => state.config.llm_config_with_key(key),
611 None if chat_provider.is_some() => state.config.placeholder_llm_config(),
612 None => {
613 let _ = sink.send(protocol::error(
614 Some(request_id),
615 "LLM_UNAVAILABLE",
616 "No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
617 per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
618 key to enable send_message.",
619 ));
620 return;
621 }
622 };
623
624 let llm = apply_model_override(llm, parsed);
629
630 let _ = sink.send(protocol::immediate_response(
632 Some(request_id),
633 202,
634 "Processing your request...",
635 json!({}),
636 ));
637
638 let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
646 crate::runner::ConfirmationConfig {
647 tool_patterns: patterns,
648 session_id: session.session_id.clone(),
649 register: {
650 let state = state.clone();
651 Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
652 },
653 clear: {
654 let state = state.clone();
655 Arc::new(move |sid: &str| state.clear_confirmation(sid))
656 },
657 }
658 });
659
660 let org_id = crate::server::SEED_ORG_ID.to_string();
664
665 let agent_cfg: Option<AgentBehaviorConfig> =
670 state.agent_config.resolve(&session.agent_id).await;
671
672 let system_prompt = agent_cfg
679 .as_ref()
680 .and_then(AgentBehaviorConfig::system_prompt)
681 .or_else(|| state.settings.get(&org_id).persona)
682 .or_else(|| state.default_persona.clone());
683
684 let greeting_section = agent_cfg
688 .as_ref()
689 .and_then(AgentBehaviorConfig::greeting_section);
690 let enabled_tools = agent_cfg
691 .as_ref()
692 .and_then(AgentBehaviorConfig::enabled_tool_ids);
693
694 let tool_configs = agent_cfg
696 .as_ref()
697 .map(AgentBehaviorConfig::tool_configs)
698 .filter(|m| !m.is_empty());
699 let session_authed = state.session_authenticated(session_id);
702 let auth_gate = agent_cfg
703 .as_ref()
704 .and_then(|cfg| build_auth_gate(state, cfg, session_authed));
705 let otp_gate = auth_gate.clone();
710
711 let workflow = agent_cfg
713 .as_ref()
714 .and_then(|c| c.conversation_workflow.clone())
715 .map(|wf| runner::WorkflowTurn {
716 workflow: wf,
717 current_step_id: state.session_current_step(session_id),
718 });
719
720 let judge: Option<Arc<dyn LlmProvider>> = if workflow.is_some() {
726 Some(build_judge_provider(state, &llm))
727 } else {
728 None
729 };
730
731 let tool_provider = state.tool_provider.clone();
733 let session_id_owned = session_id.to_string();
734
735 let state_for_turn = state.clone();
736 let access_owned = if access.organization_id.is_some() {
744 access.clone()
745 } else {
746 access
747 .clone()
748 .with_organization_id(session.organization_id.clone())
749 };
750 let sink_owned = sink.clone();
751 let request_id_owned = request_id.to_string();
752 let conversation_id = session.conversation_id.clone();
753
754 tokio::spawn(async move {
755 let result = runner::run_streaming_turn(
756 TurnRequest {
757 storage: state_for_turn.storage.clone(),
758 llm,
759 max_iterations: state_for_turn.config.max_iterations,
760 conversation_id: &conversation_id,
761 request_id: &request_id_owned,
762 user_message: &message,
763 access: access_owned,
767 llm_provider: chat_provider,
770 reranker: crate::reranker::build_reranker(
774 &crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
775 ),
776 confirmation,
777 tool_provider,
779 system_prompt,
781 org_id: Some(org_id),
782 gateway_key: turn_gateway_key,
785 workflow,
788 judge,
789 greeting_section,
791 enabled_tools,
792 auth_gate,
794 tool_configs,
795 },
796 &sink_owned,
797 )
798 .await;
799
800 match result {
801 Ok(turn) => {
802 if let Some(step) = turn.next_step_id.as_deref() {
806 state_for_turn.set_session_current_step(&session_id_owned, Some(step));
807 }
808 if let (Some(gate), Some(otp)) =
815 (otp_gate.as_ref(), state_for_turn.otp_service.clone())
816 {
817 if let Some(tool) = gate.otp_refused_tool() {
818 let contact = state_for_turn.session_contact(&session_id_owned);
819 if !contact.is_empty() {
820 offer_otp(
821 otp.as_ref(),
822 &session_id_owned,
823 &tool,
824 &contact,
825 &request_id_owned,
826 &sink_owned,
827 )
828 .await;
829 }
830 }
831 }
832 let response = runner::general_agent_response(&turn.reply);
833 let _ = sink_owned.send(protocol::eventual_response(
834 &request_id_owned,
835 200,
836 &turn.message_id,
837 response,
838 false,
839 &turn.citations,
840 turn.usage,
841 ));
842 }
843 Err(e) => {
844 let _ = sink_owned.send(protocol::error(
845 Some(&request_id_owned),
846 "AGENT_ERROR",
847 &format!("agent turn failed: {e}"),
848 ));
849 }
850 }
851 });
852}
853
854fn handle_confirm_tool_action(
868 state: &AppState,
869 parsed: &Value,
870 request_id: Option<&str>,
871 sink: &UnboundedSender<Value>,
872) {
873 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
874 let _ = sink.send(protocol::error(
875 request_id,
876 "VALIDATION_ERROR",
877 "confirm_tool_action requires a 'sessionId'",
878 ));
879 return;
880 };
881
882 let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
885 let _ = sink.send(protocol::error(
886 request_id,
887 "VALIDATION_ERROR",
888 "confirm_tool_action requires a boolean 'approved'",
889 ));
890 return;
891 };
892
893 let Some(responder) = state.take_confirmation(session_id) else {
894 let _ = sink.send(protocol::error(
895 request_id,
896 "NO_PENDING_CONFIRMATION",
897 &format!("no tool action is awaiting confirmation for session '{session_id}'"),
898 ));
899 return;
900 };
901
902 let verdict = if approved {
903 smooth_operator_core::HumanResponse::Approved
904 } else {
905 smooth_operator_core::HumanResponse::Denied {
906 reason: "user rejected the action".to_string(),
907 }
908 };
909
910 if responder.send(verdict).is_err() {
911 let _ = sink.send(protocol::error(
913 request_id,
914 "NO_PENDING_CONFIRMATION",
915 &format!(
916 "the turn awaiting confirmation for session '{session_id}' is no longer active"
917 ),
918 ));
919 return;
920 }
921
922 let _ = sink.send(protocol::immediate_response(
924 request_id,
925 200,
926 if approved {
927 "Tool action approved"
928 } else {
929 "Tool action rejected"
930 },
931 json!({ "sessionId": session_id, "approved": approved }),
932 ));
933}
934
935fn apply_model_override(mut llm: LlmConfig, body: &Value) -> LlmConfig {
943 if let Some(model) = body.get("model").and_then(Value::as_str) {
944 let model = model.trim();
945 if !model.is_empty() {
946 llm.model = model.to_string();
947 }
948 }
949 llm
950}
951
952const JUDGE_MAX_TOKENS: u32 = 16;
955
956fn build_auth_gate(
966 state: &AppState,
967 cfg: &AgentBehaviorConfig,
968 session_authenticated: bool,
969) -> Option<AuthGateHook> {
970 let supporting: std::collections::HashSet<String> = std::env::var("SMOOTH_AGENT_AUTH_TOOLS")
971 .ok()
972 .into_iter()
973 .flat_map(|s| {
974 s.split(',')
975 .map(str::trim)
976 .filter(|s| !s.is_empty())
977 .map(str::to_string)
978 .collect::<Vec<_>>()
979 })
980 .collect();
981 if supporting.is_empty() {
982 let _ = state; return None;
984 }
985 let levels = cfg
986 .enabled_tools
987 .iter()
988 .map(|t| (t.tool_id.clone(), AuthLevel::parse(&t.auth_level)))
989 .collect();
990 let hook = AuthGateHook::new(levels, cfg.visibility, session_authenticated, supporting);
991 hook.is_active().then_some(hook)
992}
993
994async fn offer_otp(
1001 otp: &dyn smooth_operator::otp::OtpService,
1002 session_id: &str,
1003 tool: &str,
1004 contact: &smooth_operator::otp::OtpContact,
1005 request_id: &str,
1006 sink: &UnboundedSender<Value>,
1007) {
1008 let channels: Vec<&str> = contact
1009 .available_channels()
1010 .iter()
1011 .map(|c| c.as_str())
1012 .collect();
1013 let _ = sink.send(protocol::otp_verification_required(
1014 request_id,
1015 tool,
1016 &format!("Verify your identity to continue using '{tool}'."),
1017 &channels,
1018 "end_user",
1019 ));
1020 match otp.send_otp(session_id, contact).await {
1021 Ok(delivery) => {
1022 let _ = sink.send(protocol::otp_sent(
1023 request_id,
1024 delivery.channel.as_str(),
1025 &delivery.masked_destination,
1026 ));
1027 }
1028 Err(e) => {
1029 let _ = sink.send(protocol::error(
1030 Some(request_id),
1031 "OTP_SEND_FAILED",
1032 &format!("failed to send verification code: {e}"),
1033 ));
1034 }
1035 }
1036}
1037
1038async fn handle_verify_otp(
1049 state: &AppState,
1050 parsed: &Value,
1051 request_id: Option<&str>,
1052 sink: &UnboundedSender<Value>,
1053) {
1054 let Some(request_id) = request_id else {
1057 let _ = sink.send(protocol::error(
1058 None,
1059 "VALIDATION_ERROR",
1060 "verify_otp requires a 'requestId'",
1061 ));
1062 return;
1063 };
1064
1065 let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
1066 let _ = sink.send(protocol::error(
1067 Some(request_id),
1068 "VALIDATION_ERROR",
1069 "verify_otp requires a 'sessionId'",
1070 ));
1071 return;
1072 };
1073
1074 let Some(code) = parsed.get("code").and_then(Value::as_str) else {
1075 let _ = sink.send(protocol::error(
1076 Some(request_id),
1077 "VALIDATION_ERROR",
1078 "verify_otp requires a 'code'",
1079 ));
1080 return;
1081 };
1082
1083 if state.get_session(session_id).is_none() {
1085 let _ = sink.send(protocol::error(
1086 Some(request_id),
1087 "SESSION_NOT_FOUND",
1088 &format!("session '{session_id}' not found"),
1089 ));
1090 return;
1091 }
1092
1093 let Some(otp) = state.otp_service.clone() else {
1097 let _ = sink.send(protocol::otp_invalid(
1098 request_id,
1099 Some("NOT_FOUND"),
1100 0,
1101 "No verification is in progress for this session.",
1102 ));
1103 return;
1104 };
1105
1106 match otp.verify_otp(session_id, code).await {
1107 smooth_operator::otp::OtpVerifyOutcome::Verified => {
1108 state.set_session_authenticated(session_id, true);
1109 let _ = sink.send(protocol::otp_verified(
1110 request_id,
1111 "Identity verified successfully.",
1112 ));
1113 }
1114 smooth_operator::otp::OtpVerifyOutcome::Invalid {
1115 attempts_remaining,
1116 error,
1117 message,
1118 } => {
1119 let _ = sink.send(protocol::otp_invalid(
1120 request_id,
1121 error.map(smooth_operator::otp::OtpError::as_str),
1122 attempts_remaining,
1123 &message,
1124 ));
1125 }
1126 }
1127}
1128
1129fn build_judge_provider(state: &AppState, turn_llm: &LlmConfig) -> Arc<dyn LlmProvider> {
1135 if let Some(mock) = state.chat_provider.clone() {
1136 return mock;
1137 }
1138 let mut cfg = turn_llm.clone();
1139 cfg.model = state.config.judge_model.clone();
1140 cfg.max_tokens = JUDGE_MAX_TOKENS;
1141 Arc::new(LlmClient::new(cfg))
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use super::*;
1147 use smooth_operator_core::llm::{ApiFormat, RetryPolicy};
1148
1149 fn base_llm() -> LlmConfig {
1152 LlmConfig {
1153 api_url: "https://llm.smoo.ai/v1".to_string(),
1154 api_key: "sk-test".to_string(),
1155 model: "claude-haiku-4-5".to_string(),
1156 max_tokens: 512,
1157 temperature: 0.0,
1158 retry_policy: RetryPolicy::default(),
1159 api_format: ApiFormat::OpenAiCompat,
1160 }
1161 }
1162
1163 #[test]
1164 fn model_override_present_replaces_model() {
1165 let body = json!({ "action": "send_message", "model": "claude-opus-4-8" });
1166 let llm = apply_model_override(base_llm(), &body);
1167 assert_eq!(llm.model, "claude-opus-4-8");
1168 assert_eq!(llm.api_url, "https://llm.smoo.ai/v1");
1170 assert_eq!(llm.api_key, "sk-test");
1171 assert_eq!(llm.max_tokens, 512);
1172 }
1173
1174 #[test]
1175 fn model_override_absent_keeps_default() {
1176 let body = json!({ "action": "send_message", "message": "hi" });
1177 let llm = apply_model_override(base_llm(), &body);
1178 assert_eq!(llm.model, "claude-haiku-4-5");
1179 }
1180
1181 #[test]
1182 fn model_override_blank_or_non_string_keeps_default() {
1183 let blank = json!({ "model": " " });
1185 assert_eq!(
1186 apply_model_override(base_llm(), &blank).model,
1187 "claude-haiku-4-5"
1188 );
1189 let wrong_type = json!({ "model": 42 });
1191 assert_eq!(
1192 apply_model_override(base_llm(), &wrong_type).model,
1193 "claude-haiku-4-5"
1194 );
1195 }
1196}