use std::sync::Arc;
use serde_json::{json, Value};
use tokio::sync::mpsc::UnboundedSender;
use smooth_operator::access_control::AccessContext;
use smooth_operator::domain::{
Conversation, Participant, ParticipantType, Platform, Session, SessionStatus,
};
use crate::protocol;
use crate::runner;
use crate::runner::TurnRequest;
use crate::state::AppState;
const AGENT_NAME: &str = "smooth-agent";
pub async fn handle_frame(
state: &AppState,
access: &AccessContext,
conn_id: &str,
origin: Option<&str>,
auth_org: Option<&str>,
raw: &str,
sink: &UnboundedSender<Value>,
) {
let parsed: Value = match serde_json::from_str(raw) {
Ok(v) => v,
Err(e) => {
let _ = sink.send(protocol::error(
None,
"VALIDATION_ERROR",
&format!("invalid JSON frame: {e}"),
));
return;
}
};
let action = parsed.get("action").and_then(Value::as_str);
let request_id = parsed.get("requestId").and_then(Value::as_str);
match action {
Some("ping") => {
let _ = sink.send(protocol::pong(request_id));
}
Some("create_conversation_session") => {
handle_create_session(state, conn_id, origin, auth_org, &parsed, request_id, sink)
.await;
}
Some("get_session") => {
handle_get_session(state, &parsed, request_id, sink);
}
Some("get_conversation_messages") => {
handle_get_conversation_messages(state, &parsed, request_id, sink).await;
}
Some("send_message") => {
handle_send_message(state, access, &parsed, request_id, sink).await;
}
Some("confirm_tool_action") => {
handle_confirm_tool_action(state, &parsed, request_id, sink);
}
Some(other) => {
let _ = sink.send(protocol::error(
request_id,
"UNSUPPORTED_ACTION",
&format!("action '{other}' is not supported by this server"),
));
}
None => {
let _ = sink.send(protocol::error(
request_id,
"VALIDATION_ERROR",
"missing 'action' field",
));
}
}
}
enum WidgetAuthOutcome {
Denied,
Allowed { org_id: Option<String> },
}
async fn enforce_widget_auth(
state: &AppState,
origin: Option<&str>,
agent_id: &str,
parsed: &Value,
request_id: Option<&str>,
sink: &UnboundedSender<Value>,
) -> WidgetAuthOutcome {
let Some(policy) = state.widget_auth.agent_widget_auth(agent_id).await else {
if state.config.widget_auth_strict {
let _ = sink.send(protocol::error(
request_id,
"AGENT_NOT_AUTHORIZED",
"this agent is not registered for embedding",
));
return WidgetAuthOutcome::Denied;
}
return WidgetAuthOutcome::Allowed { org_id: None };
};
if !smooth_operator::widget_auth::origin_allowed(
&policy.allowed_origins,
origin.unwrap_or_default(),
) {
let _ = sink.send(protocol::error(
request_id,
"ORIGIN_NOT_ALLOWED",
"this origin is not allowed to embed this agent",
));
return WidgetAuthOutcome::Denied;
}
if let Some(ac) = parsed.get("authContext") {
if !verify_auth_context_value(policy.public_key.as_deref(), ac) {
let _ = sink.send(protocol::error(
request_id,
"AUTH_CONTEXT_INVALID",
"authContext signature failed verification",
));
return WidgetAuthOutcome::Denied;
}
}
WidgetAuthOutcome::Allowed {
org_id: policy.organization_id,
}
}
fn verify_auth_context_value(public_key: Option<&str>, ac: &Value) -> bool {
let (Some(pk), Some(user_id), Some(signature), Some(timestamp)) = (
public_key,
ac.get("userId").and_then(Value::as_str),
ac.get("signature").and_then(Value::as_str),
ac.get("timestamp").and_then(Value::as_i64),
) else {
return false;
};
let now = chrono::Utc::now().timestamp();
smooth_operator::widget_auth::verify_auth_context(pk, user_id, signature, timestamp, now, 60)
}
async fn handle_create_session(
state: &AppState,
conn_id: &str,
origin: Option<&str>,
auth_org: Option<&str>,
parsed: &Value,
request_id: Option<&str>,
sink: &UnboundedSender<Value>,
) {
let agent_id = parsed
.get("agentId")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let widget_org =
match enforce_widget_auth(state, origin, &agent_id, parsed, request_id, sink).await {
WidgetAuthOutcome::Denied => return,
WidgetAuthOutcome::Allowed { org_id } => org_id,
};
let user_name = parsed
.get("userName")
.and_then(Value::as_str)
.unwrap_or("Visitor")
.to_string();
let user_email = parsed
.get("userEmail")
.and_then(Value::as_str)
.map(str::to_string);
let browser_fingerprint = parsed
.get("browserFingerprint")
.and_then(Value::as_str)
.map(str::to_string);
let now = chrono::Utc::now();
let org_id = widget_org
.or_else(|| auth_org.map(str::to_string))
.unwrap_or_else(|| crate::server::SEED_ORG_ID.to_string());
let conversation_id = uuid::Uuid::new_v4().to_string();
let session_id = uuid::Uuid::new_v4().to_string();
let user_participant_id = uuid::Uuid::new_v4().to_string();
let agent_participant_id = uuid::Uuid::new_v4().to_string();
state
.backplane
.associate(
conn_id,
smooth_operator::backplane::Target::Session(session_id.clone()),
)
.await;
state
.backplane
.associate(
conn_id,
smooth_operator::backplane::Target::Agent(agent_id.clone()),
)
.await;
let conversation = Conversation {
id: conversation_id.clone(),
platform: Platform::Web,
name: format!("Session {session_id}"),
organization_id: org_id.clone(),
idempotency_key: session_id.clone(),
metadata_json: parsed.get("metadata").cloned(),
analytics_json: None,
created_at: now,
updated_at: now,
};
let user_participant = Participant {
id: user_participant_id.clone(),
conversation_id: conversation_id.clone(),
organization_id: org_id.clone(),
participant_type: ParticipantType::User,
external_id: None,
internal_id: None,
browser_fingerprint,
browser_info: None,
name: user_name,
email: user_email,
phone: None,
crm_contact_id: None,
metadata_json: None,
created_at: now,
updated_at: now,
};
let agent_participant = Participant {
id: agent_participant_id.clone(),
conversation_id: conversation_id.clone(),
organization_id: org_id.clone(),
participant_type: ParticipantType::AiAgent,
external_id: None,
internal_id: Some(agent_id.clone()),
browser_fingerprint: None,
browser_info: None,
name: AGENT_NAME.to_string(),
email: None,
phone: None,
crm_contact_id: None,
metadata_json: None,
created_at: now,
updated_at: now,
};
let session = Session {
session_id: session_id.clone(),
conversation_id: conversation_id.clone(),
organization_id: org_id.clone(),
agent_id: agent_id.clone(),
agent_name: AGENT_NAME.to_string(),
user_participant_id: user_participant_id.clone(),
agent_participant_id: agent_participant_id.clone(),
thread_id: conversation_id.clone(),
status: Some(SessionStatus::Active),
token_count: Some(0),
message_count: Some(0),
metadata: None,
created_at: Some(now),
updated_at: Some(now),
ended_at: None,
last_activity_at: Some(now),
};
let storage = state.storage.clone();
let sink_clone = sink.clone();
let request_id_owned = request_id.map(str::to_string);
let session_for_registry = session.clone();
let state_clone = state.clone();
let data = json!({
"sessionId": session_id,
"conversationId": conversation_id,
"agentId": agent_id,
"agentName": AGENT_NAME,
"userParticipantId": user_participant_id,
"agentParticipantId": agent_participant_id,
});
tokio::spawn(async move {
let rid = request_id_owned.as_deref();
if let Err(e) = storage.create_conversation(conversation).await {
let _ = sink_clone.send(protocol::error(
rid,
"INTERNAL_ERROR",
&format!("create conversation failed: {e}"),
));
return;
}
if let Err(e) = storage.add_participant(user_participant).await {
let _ = sink_clone.send(protocol::error(
rid,
"INTERNAL_ERROR",
&format!("add user participant failed: {e}"),
));
return;
}
if let Err(e) = storage.add_participant(agent_participant).await {
let _ = sink_clone.send(protocol::error(
rid,
"INTERNAL_ERROR",
&format!("add agent participant failed: {e}"),
));
return;
}
if let Err(e) = storage.create_session(session).await {
let _ = sink_clone.send(protocol::error(
rid,
"INTERNAL_ERROR",
&format!("create session failed: {e}"),
));
return;
}
state_clone.insert_session(session_for_registry);
let _ = sink_clone.send(protocol::immediate_response(
rid,
200,
"Session created",
data,
));
});
}
fn handle_get_session(
state: &AppState,
parsed: &Value,
request_id: Option<&str>,
sink: &UnboundedSender<Value>,
) {
let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
let _ = sink.send(protocol::error(
request_id,
"VALIDATION_ERROR",
"missing 'sessionId'",
));
return;
};
match state.get_session(session_id) {
Some(s) => {
let data = json!({
"sessionId": s.session_id,
"conversationId": s.conversation_id,
"agentId": s.agent_id,
"agentName": s.agent_name,
"userParticipantId": s.user_participant_id,
"agentParticipantId": s.agent_participant_id,
"threadId": s.thread_id,
"status": s.status.map_or("active", |st| match st {
SessionStatus::Active => "active",
SessionStatus::Idle => "idle",
SessionStatus::Ended => "ended",
}),
});
let _ = sink.send(protocol::immediate_response(
request_id, 200, "Session", data,
));
}
None => {
let _ = sink.send(protocol::error(
request_id,
"SESSION_NOT_FOUND",
&format!("session '{session_id}' not found"),
));
}
}
}
async fn handle_get_conversation_messages(
state: &AppState,
parsed: &Value,
request_id: Option<&str>,
sink: &UnboundedSender<Value>,
) {
let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
let _ = sink.send(protocol::error(
request_id,
"VALIDATION_ERROR",
"missing 'sessionId'",
));
return;
};
let Some(session) = state.get_session(session_id) else {
let _ = sink.send(protocol::error(
request_id,
"SESSION_NOT_FOUND",
&format!("session '{session_id}' not found"),
));
return;
};
const DEFAULT_LIMIT: usize = 50;
let limit = parsed
.get("limit")
.and_then(Value::as_u64)
.map(|n| n as usize)
.filter(|n| *n > 0)
.unwrap_or(DEFAULT_LIMIT);
let cursor = parsed
.get("cursor")
.and_then(Value::as_str)
.map(str::to_string);
let mut query = smooth_operator::adapter::MessageQuery::new(&session.conversation_id, limit);
query.cursor = cursor;
query.descending = true;
match state.storage.list_messages_by_conversation(query).await {
Ok(page) => {
let data = json!({
"conversationId": session.conversation_id,
"messages": page.messages,
"nextCursor": page.next_cursor,
"hasMore": page.next_cursor.is_some(),
});
let _ = sink.send(protocol::immediate_response(
request_id,
200,
"ConversationMessages",
data,
));
}
Err(e) => {
let _ = sink.send(protocol::error(
request_id,
"STORAGE_ERROR",
&format!("failed to list messages: {e}"),
));
}
}
}
async fn handle_send_message(
state: &AppState,
access: &AccessContext,
parsed: &Value,
request_id: Option<&str>,
sink: &UnboundedSender<Value>,
) {
let Some(request_id) = request_id else {
let _ = sink.send(protocol::error(
None,
"VALIDATION_ERROR",
"send_message requires a 'requestId'",
));
return;
};
let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
let _ = sink.send(protocol::error(
Some(request_id),
"VALIDATION_ERROR",
"missing 'sessionId'",
));
return;
};
let message = match parsed.get("message").and_then(Value::as_str) {
Some(m) if !m.trim().is_empty() => m.to_string(),
_ => {
let _ = sink.send(protocol::error(
Some(request_id),
"VALIDATION_ERROR",
"missing or empty 'message'",
));
return;
}
};
let Some(session) = state.get_session(session_id) else {
let _ = sink.send(protocol::error(
Some(request_id),
"SESSION_NOT_FOUND",
&format!("session '{session_id}' not found"),
));
return;
};
let chat_provider = state.chat_provider.clone();
let org_id = match state
.storage
.get_conversation(&session.conversation_id)
.await
{
Ok(Some(conversation)) => conversation.organization_id,
Ok(None) | Err(_) => String::new(),
};
let resolved_key = smooth_operator::gateway_key::resolve_gateway_key(
&state.gateway_key_resolver,
&org_id,
state.config.gateway_key.as_deref(),
)
.await;
let turn_gateway_key = resolved_key.clone();
let llm = match resolved_key {
Some(key) => state.config.llm_config_with_key(key),
None if chat_provider.is_some() => state.config.placeholder_llm_config(),
None => {
let _ = sink.send(protocol::error(
Some(request_id),
"LLM_UNAVAILABLE",
"No LLM gateway key is available for this turn (SMOOAI_GATEWAY_KEY is unset and no \
per-org key resolved); this server cannot serve LLM turns. Configure the gateway \
key to enable send_message.",
));
return;
}
};
let _ = sink.send(protocol::immediate_response(
Some(request_id),
202,
"Processing your request...",
json!({}),
));
let confirmation = state.config.confirmation_tool_patterns().map(|patterns| {
crate::runner::ConfirmationConfig {
tool_patterns: patterns,
session_id: session.session_id.clone(),
register: {
let state = state.clone();
Arc::new(move |sid: &str, responder| state.register_confirmation(sid, responder))
},
clear: {
let state = state.clone();
Arc::new(move |sid: &str| state.clear_confirmation(sid))
},
}
});
let org_id = crate::server::SEED_ORG_ID.to_string();
let system_prompt = state.settings.get(&org_id).persona;
let tool_provider = state.tool_provider.clone();
let state_for_turn = state.clone();
let access_owned = if access.organization_id.is_some() {
access.clone()
} else {
access
.clone()
.with_organization_id(session.organization_id.clone())
};
let sink_owned = sink.clone();
let request_id_owned = request_id.to_string();
let conversation_id = session.conversation_id.clone();
tokio::spawn(async move {
let result = runner::run_streaming_turn(
TurnRequest {
storage: state_for_turn.storage.clone(),
llm,
max_iterations: state_for_turn.config.max_iterations,
conversation_id: &conversation_id,
request_id: &request_id_owned,
user_message: &message,
access: access_owned,
llm_provider: chat_provider,
reranker: crate::reranker::build_reranker(
&crate::reranker::RerankerConfig::from_server_config(&state_for_turn.config),
),
confirmation,
tool_provider,
system_prompt,
org_id: Some(org_id),
gateway_key: turn_gateway_key,
},
&sink_owned,
)
.await;
match result {
Ok(turn) => {
let response = runner::general_agent_response(&turn.reply);
let _ = sink_owned.send(protocol::eventual_response(
&request_id_owned,
200,
&turn.message_id,
response,
false,
&turn.citations,
));
}
Err(e) => {
let _ = sink_owned.send(protocol::error(
Some(&request_id_owned),
"AGENT_ERROR",
&format!("agent turn failed: {e}"),
));
}
}
});
}
fn handle_confirm_tool_action(
state: &AppState,
parsed: &Value,
request_id: Option<&str>,
sink: &UnboundedSender<Value>,
) {
let Some(session_id) = parsed.get("sessionId").and_then(Value::as_str) else {
let _ = sink.send(protocol::error(
request_id,
"VALIDATION_ERROR",
"confirm_tool_action requires a 'sessionId'",
));
return;
};
let Some(approved) = parsed.get("approved").and_then(Value::as_bool) else {
let _ = sink.send(protocol::error(
request_id,
"VALIDATION_ERROR",
"confirm_tool_action requires a boolean 'approved'",
));
return;
};
let Some(responder) = state.take_confirmation(session_id) else {
let _ = sink.send(protocol::error(
request_id,
"NO_PENDING_CONFIRMATION",
&format!("no tool action is awaiting confirmation for session '{session_id}'"),
));
return;
};
let verdict = if approved {
smooth_operator_core::HumanResponse::Approved
} else {
smooth_operator_core::HumanResponse::Denied {
reason: "user rejected the action".to_string(),
}
};
if responder.send(verdict).is_err() {
let _ = sink.send(protocol::error(
request_id,
"NO_PENDING_CONFIRMATION",
&format!(
"the turn awaiting confirmation for session '{session_id}' is no longer active"
),
));
return;
}
let _ = sink.send(protocol::immediate_response(
request_id,
200,
if approved {
"Tool action approved"
} else {
"Tool action rejected"
},
json!({ "sessionId": session_id, "approved": approved }),
));
}