smooth_operator_server/server.rs
1//! The axum WebSocket server: one `/ws` endpoint, one task per connection.
2//!
3//! Per connection we split the socket and run two tasks joined by an
4//! `UnboundedSender<serde_json::Value>` outbound sink:
5//!
6//! - a **writer** that drains the sink and writes each event as a JSON text
7//! frame, and
8//! - a **reader** that reads inbound frames and dispatches them via
9//! [`crate::handler::handle_frame`], passing the sink so handlers (including
10//! the streaming `send_message`) can emit events as they happen.
11//!
12//! Using a sink channel (instead of writing directly from the handler) is what
13//! lets a streaming turn fire many `stream_token` events from inside the agent
14//! loop while the connection is still reading.
15
16use std::net::SocketAddr;
17use std::sync::Arc;
18
19use anyhow::{Context, Result};
20use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
21use axum::extract::{Query, State};
22use axum::response::{Html, IntoResponse, Response};
23use axum::routing::get;
24use axum::Router;
25
26use futures_util::{SinkExt, StreamExt};
27use smooth_operator::access_control::AccessContext;
28use tokio::net::TcpListener;
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_adapter_memory::InMemoryStorageAdapter;
32use smooth_operator_core::{Document, DocumentType};
33
34use crate::config::ServerConfig;
35use crate::handler;
36use crate::state::AppState;
37
38/// Build the axum [`Router`] for the given application state. Exposed so tests
39/// can boot the server in-process. Serves the WebSocket `/ws` endpoint plus the
40/// auth-gated admin HTTP API under `/admin` (see [`crate::admin`]).
41pub fn router(state: AppState) -> Router {
42 let mut router = Router::new()
43 .route("/ws", get(ws_upgrade))
44 // Unauthenticated liveness/readiness probe. A WebSocket `/ws` upgrade is
45 // not a plain GET, so HTTP load balancers (AWS ALB / nginx ingress) need a
46 // real HTTP route that answers 200 to confirm the listener is up. Cheap
47 // and dependency-free — it does not touch storage/LLM, so it stays Ready
48 // even when an optional backend (gateway key, DB) is degraded.
49 .route("/health", get(health))
50 .merge(crate::admin::router());
51 // The local deployment flavor opts into serving the official widget; other
52 // flavors never mount these routes.
53 if state.serve_widget {
54 router = router
55 .route("/", get(widget_index))
56 .route("/chat-widget.iife.js", get(widget_bundle));
57 }
58 router.with_state(state)
59}
60
61/// Serve the local-flavor widget host page at `/`, injecting the auth token
62/// (same-origin, loopback) so the embedded `<smooth-agent-chat>` connects to
63/// this server's own `/ws?token=…`. The token is JSON-encoded into the page so
64/// quoting is always safe.
65async fn widget_index(State(state): State<AppState>) -> Html<String> {
66 let token_json = serde_json::to_string(state.widget_token.as_deref().unwrap_or(""))
67 .unwrap_or_else(|_| "\"\"".to_string());
68 let html =
69 include_str!("../assets/widget-index.html").replace("__SMOOTH_LOCAL_TOKEN__", &token_json);
70 Html(html)
71}
72
73/// Serve the vendored official widget IIFE bundle.
74async fn widget_bundle() -> impl IntoResponse {
75 (
76 [(
77 axum::http::header::CONTENT_TYPE,
78 "application/javascript; charset=utf-8",
79 )],
80 include_str!("../assets/chat-widget.iife.js"),
81 )
82}
83
84/// `GET /health` → `200 OK`. The minimal HTTP health endpoint for container
85/// orchestrators and HTTP load balancers (the WS `/ws` route can't serve a plain
86/// GET healthcheck).
87async fn health() -> &'static str {
88 "ok"
89}
90
91/// The document set the seeded demo docs are tagged into, so
92/// `GET /admin/document-sets` has something to report in a seeded server.
93const SEED_DOCUMENT_SET: &str = "policies";
94
95/// The org the seeded demo docs + their document-set registry entries belong to.
96/// Mirrors the org `handler::handle_create_session` stamps onto reference
97/// conversations, so the seeded sets show up for the reference org's admin
98/// caller (and ONLY that org — cross-org scoping).
99pub const SEED_ORG_ID: &str = "reference-org";
100
101/// Build an [`AppState`] over a fresh in-memory adapter, seeding the knowledge
102/// base when `config.seed_kb` is set. Exposed for tests + the binary.
103///
104/// The auth verifier defaults to [`NoAuthVerifier`](smooth_operator::auth::NoAuthVerifier)
105/// here (the protocol-only path needs no auth); the **binary** path
106/// ([`build_state_from_env`]) installs the env-configured, secure-by-default
107/// verifier instead.
108#[must_use]
109pub fn build_state(config: ServerConfig) -> AppState {
110 let seed = config.seed_kb;
111 let storage = Arc::new(InMemoryStorageAdapter::new());
112 let state = AppState::new(storage.clone(), config);
113 if seed {
114 seed_knowledge(storage.as_ref());
115 // Record the seeded docs' document-set membership for the admin API
116 // (the in-memory backend drops document metadata, so the registry is the
117 // source of truth for set names + counts).
118 state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
119 state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
120 }
121 state
122}
123
124/// Build an [`AppState`] with the **env-configured** auth verifier (secure by
125/// default — see [`smooth_operator::auth::AuthConfig`]). Used by the binary.
126///
127/// # Errors
128/// Returns an error if the auth configuration is invalid (e.g. `AUTH_MODE=jwt`
129/// with no key) — the server refuses to start rather than fall back to no-auth.
130pub fn build_state_from_env(config: ServerConfig) -> Result<AppState> {
131 let verifier = smooth_operator::auth::AuthConfig::from_env()
132 .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
133 let state = install_widget_auth_from_env(build_state(config));
134 Ok(state.with_auth(Arc::from(verifier)))
135}
136
137/// Install an [`HttpWidgetAuth`](smooth_operator::widget_auth::HttpWidgetAuth)
138/// provider from `WIDGET_AUTH_URL` (optionally `WIDGET_AUTH_BEARER` +
139/// `WIDGET_AUTH_TTL_SECS`); otherwise leave the permissive default. This lets a
140/// host enforce embeddable-widget auth against its own policy service by setting
141/// env vars — no custom binary needed. (A host wanting bespoke logic still
142/// installs its own provider via [`AppState::with_widget_auth`].)
143fn install_widget_auth_from_env(state: AppState) -> AppState {
144 let Ok(url) = std::env::var("WIDGET_AUTH_URL") else {
145 return state;
146 };
147 let url = url.trim();
148 if url.is_empty() {
149 return state;
150 }
151 let mut provider = smooth_operator::widget_auth::HttpWidgetAuth::new(url);
152 if let Ok(bearer) = std::env::var("WIDGET_AUTH_BEARER") {
153 let bearer = bearer.trim();
154 if !bearer.is_empty() {
155 provider = provider.with_bearer(bearer);
156 }
157 }
158 if let Some(secs) = std::env::var("WIDGET_AUTH_TTL_SECS")
159 .ok()
160 .and_then(|s| s.trim().parse::<u64>().ok())
161 {
162 provider = provider.with_ttl(std::time::Duration::from_secs(secs));
163 }
164 state.with_widget_auth(Arc::new(provider))
165}
166
167/// Build an [`AppState`] selecting the **storage backend** (and the matching
168/// durable **admin stores**) from `config.storage`, then installing the
169/// env-configured auth verifier.
170///
171/// - [`StorageBackend::Memory`](crate::config::StorageBackend::Memory) — the
172/// in-memory adapter + in-memory admin stores (the [`build_state`] path; lost
173/// on restart). The default.
174/// - [`StorageBackend::Postgres`](crate::config::StorageBackend::Postgres) —
175/// the Postgres + pgvector adapter; the admin stores persist to the **same
176/// database** (`connector_configs` / `agent_settings` / `indexing_runs`).
177/// Connection string from `SMOOTH_AGENT_DATABASE_URL` / `DATABASE_URL`.
178/// - [`StorageBackend::Dynamodb`](crate::config::StorageBackend::Dynamodb) — the
179/// DynamoDB single-table adapter; the admin stores persist to the **same
180/// table**. Table from `SMOOTH_AGENT_DDB_TABLE`; the table is created if
181/// absent.
182///
183/// The admin store backend always matches the storage backend so a connector
184/// config / settings / indexing run survives a restart wherever the
185/// conversations and knowledge live.
186///
187/// # Errors
188/// Returns an error if the auth configuration is invalid, or if the selected
189/// persistent backend fails to connect / migrate.
190pub async fn build_state_from_env_async(config: ServerConfig) -> Result<AppState> {
191 use crate::config::StorageBackend;
192 // Only the Postgres / DynamoDB arms name `StorageAdapter` (for the
193 // `Arc<dyn StorageAdapter>` annotation); on a lean build with neither feature
194 // those arms are compiled out, so the import would be unused.
195 #[cfg(any(feature = "postgres", feature = "dynamodb"))]
196 use smooth_operator::adapter::StorageAdapter;
197
198 let verifier = smooth_operator::auth::AuthConfig::from_env()
199 .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
200
201 let state = match config.storage {
202 // The in-memory path is unchanged (synchronous, no external services).
203 StorageBackend::Memory => build_state(config),
204
205 // The Postgres storage backend (and its matching durable admin stores)
206 // is only compiled in on a build with the `postgres` feature (the default
207 // / cloud build). A lean `--no-default-features` build returns a clear
208 // error if `SMOOTH_AGENT_STORAGE=postgres` is requested at runtime.
209 #[cfg(feature = "postgres")]
210 StorageBackend::Postgres => {
211 use smooth_operator_adapter_postgres::PostgresAdapter;
212 // The pgvector column width MUST match the embedder the `/index`
213 // path uses (1536 keyed / 1024 offline). Build the embedder from
214 // config and create the adapter with it so document vectors (at
215 // ingest) and query vectors agree — no silent 1024/1536 mismatch.
216 let embedder = crate::embedder::build_embedder(
217 &crate::embedder::EmbedderConfig::from_server_config(&config),
218 );
219 let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
220 .or_else(|_| std::env::var("DATABASE_URL"))
221 .map_err(|_| {
222 anyhow::anyhow!(
223 "Postgres backend selected but neither SMOOTH_AGENT_DATABASE_URL \
224 nor DATABASE_URL is set"
225 )
226 })?;
227 let adapter = Arc::new(
228 PostgresAdapter::connect_with_embedder(&conn_str, embedder)
229 .await
230 .map_err(|e| anyhow::anyhow!("connecting Postgres storage backend: {e}"))?,
231 );
232 // Admin stores against the SAME database — durable.
233 let connectors = Arc::new(adapter.connector_config_store());
234 let settings = Arc::new(adapter.settings_store());
235 let indexing = Arc::new(adapter.indexing_store());
236 // Per-agent behavior config from the monorepo `agents` table on the
237 // same pool — so an agent's `instructions` / `conversation_workflow`
238 // drive its conversations. Degrades to the org default when the table
239 // is absent (a standalone deploy) or a row is malformed.
240 let agent_config = Arc::new(adapter.agent_config_resolver());
241 let storage: Arc<dyn StorageAdapter> = adapter;
242 AppState::new(storage, config)
243 .with_connector_configs(connectors)
244 .with_settings(settings)
245 .with_indexing(indexing)
246 .with_agent_config(agent_config)
247 }
248
249 // The DynamoDB storage backend is only compiled in on a build with the
250 // `dynamodb` feature (the default / cloud build). A lean build returns a
251 // clear error if `SMOOTH_AGENT_STORAGE=dynamodb` is requested at runtime.
252 #[cfg(feature = "dynamodb")]
253 StorageBackend::Dynamodb => {
254 use smooth_operator_adapter_dynamodb::DynamoDbAdapter;
255 let adapter = Arc::new(
256 DynamoDbAdapter::from_env(None)
257 .await
258 .map_err(|e| anyhow::anyhow!("connecting DynamoDB storage backend: {e}"))?,
259 );
260 adapter
261 .create_table()
262 .await
263 .map_err(|e| anyhow::anyhow!("creating DynamoDB table: {e}"))?;
264 // Admin stores against the SAME table — durable.
265 let connectors = Arc::new(adapter.connector_config_store());
266 let settings = Arc::new(adapter.settings_store());
267 let indexing = Arc::new(adapter.indexing_store());
268 let storage: Arc<dyn StorageAdapter> = adapter;
269 AppState::new(storage, config)
270 .with_connector_configs(connectors)
271 .with_settings(settings)
272 .with_indexing(indexing)
273 }
274
275 // Lean build: a persistent backend was requested but its feature wasn't
276 // compiled in. Fail loud with an actionable message rather than silently
277 // running in-memory (which would lose data on restart).
278 #[cfg(not(feature = "postgres"))]
279 StorageBackend::Postgres => {
280 anyhow::bail!(
281 "SMOOTH_AGENT_STORAGE=postgres requires building with --features postgres \
282 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
283 with the 'cloud'/'postgres' feature"
284 );
285 }
286 #[cfg(not(feature = "dynamodb"))]
287 StorageBackend::Dynamodb => {
288 anyhow::bail!(
289 "SMOOTH_AGENT_STORAGE=dynamodb requires building with --features dynamodb \
290 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
291 with the 'cloud'/'dynamodb' feature"
292 );
293 }
294 };
295
296 let state = install_backplane_from_env(state).await?;
297 let state = install_widget_auth_from_env(state);
298
299 Ok(state.with_auth(Arc::from(verifier)))
300}
301
302/// Select the connection [`Backplane`](smooth_operator::backplane::Backplane)
303/// from `SMOOTH_AGENT_BACKPLANE`, installing it via
304/// [`AppState::with_backplane`](crate::state::AppState::with_backplane).
305///
306/// | value | backend | url env |
307/// |---|---|---|
308/// | unset / `memory` / `inmemory` | single-process (default) | — |
309/// | `redis` / `valkey` | [`RedisBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_REDIS_URL` |
310/// | `nats` | [`NatsBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_NATS_URL` |
311///
312/// A distributed backend is required for >1 replica (otherwise an event produced
313/// on one pod can't reach a socket on another) and to let non-AI publishers push
314/// realtime events via `Backplane::publish`.
315///
316/// # Errors
317/// Returns an error for an unknown backend value, a missing url, or a failed
318/// connection — fail loud at boot rather than silently run single-process.
319async fn install_backplane_from_env(state: AppState) -> Result<AppState> {
320 let kind = std::env::var("SMOOTH_AGENT_BACKPLANE")
321 .unwrap_or_default()
322 .trim()
323 .to_lowercase();
324
325 let url = |specific: &str| -> Result<String> {
326 std::env::var("SMOOTH_AGENT_BACKPLANE_URL")
327 .or_else(|_| std::env::var(specific))
328 .map_err(|_| {
329 anyhow::anyhow!(
330 "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set"
331 )
332 })
333 };
334
335 match kind.as_str() {
336 "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed
337 // The Redis backplane is only compiled in on a build with the `redis`
338 // feature (the default / cloud build). A lean `--no-default-features`
339 // build returns a clear error rather than silently running single-process.
340 "redis" | "valkey" => {
341 #[cfg(feature = "redis")]
342 {
343 use smooth_operator_adapter_backplane_redis::RedisBackplane;
344 let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?)
345 .await
346 .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?;
347 Ok(state.with_backplane(Arc::new(backplane)))
348 }
349 #[cfg(not(feature = "redis"))]
350 {
351 let _ = url; // silence unused-closure warning on the lean build
352 anyhow::bail!(
353 "SMOOTH_AGENT_BACKPLANE={kind} requires building with --features redis \
354 (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
355 with the 'cloud'/'redis' feature"
356 )
357 }
358 }
359 // The NATS backplane is only compiled in on a build with the `nats`
360 // feature (the default / cloud build). A lean build returns a clear error.
361 "nats" => {
362 #[cfg(feature = "nats")]
363 {
364 use smooth_operator_adapter_backplane_nats::NatsBackplane;
365 let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?)
366 .await
367 .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?;
368 Ok(state.with_backplane(Arc::new(backplane)))
369 }
370 #[cfg(not(feature = "nats"))]
371 {
372 let _ = url; // silence unused-closure warning on the lean build
373 anyhow::bail!(
374 "SMOOTH_AGENT_BACKPLANE=nats requires building with --features nats \
375 (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
376 with the 'cloud'/'nats' feature"
377 )
378 }
379 }
380 other => Err(anyhow::anyhow!(
381 "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)"
382 )),
383 }
384}
385
386/// Seed a couple of distinctive demo docs so knowledge-grounded E2E is
387/// deterministic. The 17-day return window is deliberately unusual so an
388/// ungrounded answer can't accidentally match it. Both docs are tagged into the
389/// `policies` document set so the admin API can report it.
390pub fn seed_knowledge(storage: &InMemoryStorageAdapter) {
391 let kb = smooth_operator::adapter::StorageAdapter::knowledge(storage);
392 let _ = kb.ingest(smooth_operator::with_document_set(
393 Document::new(
394 "SmooAI's return window is exactly 17 days from delivery. Returns after 17 days are not accepted.",
395 "policies/returns.md",
396 DocumentType::Documentation,
397 ),
398 [SEED_DOCUMENT_SET],
399 ));
400 let _ = kb.ingest(smooth_operator::with_document_set(
401 Document::new(
402 "SmooAI standard shipping takes 5 to 7 business days. Expedited shipping takes 2 business days.",
403 "policies/shipping.md",
404 DocumentType::Documentation,
405 ),
406 [SEED_DOCUMENT_SET],
407 ));
408}
409
410/// Bind on `<SMOOTH_AGENT_BIND>:<port>` (default loopback) and serve until the
411/// process is killed. Returns the bound [`TcpListener`] + the router, used by
412/// both the binary and tests (tests bind port 0 for an ephemeral port).
413///
414/// Uses the **env-configured, secure-by-default** auth verifier
415/// ([`build_state_from_env`]) — the binary refuses to start if auth is
416/// misconfigured rather than silently serving the admin API unauthenticated.
417///
418/// # Errors
419/// Returns an error if the auth configuration is invalid or the TCP bind fails.
420pub async fn bind(config: ServerConfig) -> Result<(TcpListener, Router, CancellationToken)> {
421 let ip: std::net::IpAddr = config
422 .bind
423 .parse()
424 .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
425 let addr = SocketAddr::new(ip, config.port);
426 // Async so a Postgres / DynamoDB storage backend (and its matching durable
427 // admin stores) can be wired; in-memory stays synchronous inside.
428 let state = build_state_from_env_async(config).await?;
429 // Clone the shutdown token BEFORE the state is consumed into the router, so
430 // `run` can cancel it (which fans out to every per-connection clone) when a
431 // SIGTERM/ctrl_c arrives.
432 let shutdown = state.shutdown.clone();
433 let app = router(state);
434 let listener = TcpListener::bind(addr)
435 .await
436 .with_context(|| format!("binding WebSocket server on {addr}"))?;
437 Ok((listener, app, shutdown))
438}
439
440/// Serve a **pre-built** [`AppState`] to completion (blocks), binding on
441/// `state.config.bind:state.config.port`.
442///
443/// This is the library entry point for callers that assemble their own
444/// `AppState` — e.g. the `dev-support` example, which ingests a GitHub repo into
445/// a storage adapter, wires the env-configured [`AuthVerifier`], and then serves
446/// that exact state so the chat-widget queries the ingested knowledge. It does
447/// **not** rebuild the state or touch the ACL/auth/embedder/reranker selection —
448/// those are baked into the `state` the caller passes in. The WS loop, router,
449/// and listening log are identical to [`run`] (which builds its state from env);
450/// `run` is unchanged.
451///
452/// # Errors
453/// Returns an error if the TCP bind fails or serving fails.
454pub async fn serve_state(state: AppState) -> Result<()> {
455 let ip: std::net::IpAddr = state
456 .config
457 .bind
458 .parse()
459 .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
460 let addr = SocketAddr::new(ip, state.config.port);
461 let listener = TcpListener::bind(addr)
462 .await
463 .with_context(|| format!("binding WebSocket server on {addr}"))?;
464 serve_state_on(state, listener).await
465}
466
467/// Serve a pre-built [`AppState`] on an already-bound [`TcpListener`] (blocks).
468///
469/// Splitting the bind from the serve lets a caller (or a test) bind an ephemeral
470/// port, read [`TcpListener::local_addr`] for the real port, then hand the
471/// listener here. Logs the same "listening" line [`run`] does.
472///
473/// # Errors
474/// Returns an error if serving fails.
475pub async fn serve_state_on(state: AppState, listener: TcpListener) -> Result<()> {
476 let has_llm = state.config.has_llm();
477 let model = state.config.model.clone();
478 let gateway = state.config.gateway_url.clone();
479 let local = listener.local_addr().context("local addr")?;
480 let app = router(state);
481
482 tracing::info!(
483 %local,
484 endpoint = "/ws",
485 %model,
486 %gateway,
487 llm_enabled = has_llm,
488 "smooth-operator-server listening"
489 );
490 println!(
491 "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
492 );
493
494 axum::serve(listener, app)
495 .await
496 .context("serving WebSocket connections")?;
497 Ok(())
498}
499
500/// Run the server to completion (blocks). Logs a single listening line.
501///
502/// # Errors
503/// Returns an error if binding or serving fails.
504pub async fn run(config: ServerConfig) -> Result<()> {
505 let has_llm = config.has_llm();
506 let model = config.model.clone();
507 let gateway = config.gateway_url.clone();
508 let (listener, app, shutdown) = bind(config).await?;
509 let local = listener.local_addr().context("local addr")?;
510
511 tracing::info!(
512 %local,
513 endpoint = "/ws",
514 %model,
515 %gateway,
516 llm_enabled = has_llm,
517 "smooth-operator-server listening"
518 );
519 // Also print to stdout so the run-confirmation check is unambiguous without
520 // a tracing subscriber filter.
521 println!(
522 "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
523 );
524
525 // Graceful drain: stop accepting new connections AND cancel the shared
526 // shutdown token on SIGTERM (k8s pod termination) / ctrl_c. Cancelling fans
527 // out to every per-connection reader loop so each finishes its in-flight turn
528 // and detaches from the backplane before the process exits — within the
529 // chart's `terminationGracePeriodSeconds` window.
530 axum::serve(listener, app)
531 .with_graceful_shutdown(async move {
532 wait_for_shutdown_signal().await;
533 tracing::info!("shutdown signal received; draining in-flight WebSocket turns");
534 shutdown.cancel();
535 })
536 .await
537 .context("serving WebSocket connections")?;
538 Ok(())
539}
540
541/// Resolve when the process receives a termination request: SIGTERM (how
542/// Kubernetes asks a pod to stop on scale-down / rollout) **or** ctrl_c
543/// (SIGINT — interactive `cargo run`), whichever comes first.
544///
545/// Unix-only signal handling (the server targets Linux/k8s); on a non-unix host
546/// it falls back to ctrl_c alone so the binary still stops cleanly.
547async fn wait_for_shutdown_signal() {
548 #[cfg(unix)]
549 {
550 use tokio::signal::unix::{signal, SignalKind};
551 // If installing the SIGTERM handler somehow fails, fall back to ctrl_c
552 // only rather than panicking the serve task.
553 let mut sigterm = match signal(SignalKind::terminate()) {
554 Ok(s) => s,
555 Err(e) => {
556 tracing::warn!(error = %e, "failed to install SIGTERM handler; ctrl_c only");
557 let _ = tokio::signal::ctrl_c().await;
558 return;
559 }
560 };
561 tokio::select! {
562 _ = sigterm.recv() => {}
563 _ = tokio::signal::ctrl_c() => {}
564 }
565 }
566 #[cfg(not(unix))]
567 {
568 let _ = tokio::signal::ctrl_c().await;
569 }
570}
571
572/// Query parameters accepted on the `/ws` upgrade. `token` carries the bearer
573/// JWT used to authenticate the connection (browsers can't set custom headers on
574/// a WebSocket handshake, so the token rides on the query string — the standard
575/// pattern for WS auth).
576#[derive(Debug, serde::Deserialize, Default)]
577struct WsQuery {
578 /// The bearer token (raw JWT, no `Bearer ` prefix), if provided.
579 #[serde(default)]
580 token: Option<String>,
581}
582
583/// Resolve the connection's [`AccessContext`] from the `?token=` query param.
584///
585/// **Fail closed for ACL'd content**: when no token is presented, or the auth
586/// verifier is the no-key [`AdminDisabledVerifier`] (admin/auth unconfigured —
587/// dev/no-auth), or the token fails to verify, the connection runs as
588/// [`AccessContext::anonymous`] — which sees **only org-public** knowledge, not
589/// every document. A valid token yields the principal's full
590/// [`AccessContext`](smooth_operator::auth::Principal::access_context) (user id +
591/// groups), so it can read documents scoped to it. Verification failures are
592/// logged (never the token) and degrade to anonymous rather than dropping the
593/// connection, so the dev/no-auth case still serves org-public knowledge.
594/// The connection's resolved auth identity: its document-level
595/// [`AccessContext`] (used to filter retrieval) plus, when the token verified to
596/// a JWT principal, that principal's `org_id` (used to create sessions under the
597/// authenticated org). `org_id` is `None` for the anonymous/dev/no-auth case, so
598/// create-session falls through to the agent's widget-policy org, then the seed
599/// org — leaving the no-auth flavor unchanged.
600struct ConnectionAuth {
601 access: AccessContext,
602 org_id: Option<String>,
603}
604
605/// Resolve the connection's access from `?token=`.
606///
607/// **Lenient (default):** a missing/invalid token degrades to
608/// [`AccessContext::anonymous`] (org-public only) — keeps the dev/no-auth `/ws`
609/// path and the embeddable widget's anonymous flow working while failing closed
610/// for ACL'd content. **Strict ([`AppState::strict_auth`]):** a missing/invalid
611/// token is **rejected** (the connection is refused, not degraded) — what a
612/// single-tenant local/tailnet deployment wants so a tokenless peer can't drive
613/// the agent. Returns `Err(())` to signal "reject the upgrade".
614fn resolve_ws_access(state: &AppState, query: &WsQuery) -> Result<ConnectionAuth, ()> {
615 let Some(token) = query
616 .token
617 .as_deref()
618 .map(str::trim)
619 .filter(|t| !t.is_empty())
620 else {
621 if state.strict_auth {
622 tracing::warn!("strict auth: rejecting tokenless /ws connection");
623 return Err(());
624 }
625 // No token → anonymous (org-public only).
626 return Ok(ConnectionAuth {
627 access: AccessContext::anonymous(),
628 org_id: None,
629 });
630 };
631 match state.auth.verify(token) {
632 Ok(principal) => Ok(ConnectionAuth {
633 access: principal.access_context(),
634 org_id: Some(principal.org_id),
635 }),
636 Err(e) => {
637 // Don't leak the token; log only the mode + a generic reason.
638 tracing::warn!(
639 auth_mode = state.auth.mode(),
640 error = %e,
641 strict = state.strict_auth,
642 "ws token failed verification"
643 );
644 if state.strict_auth {
645 return Err(());
646 }
647 Ok(ConnectionAuth {
648 access: AccessContext::anonymous(),
649 org_id: None,
650 })
651 }
652 }
653}
654
655/// Axum handler: upgrade an HTTP request on `/ws` to a WebSocket. The bearer
656/// token (if any) is taken from the `?token=` query param, resolved to an
657/// [`AccessContext`] at connect time, and threaded into every turn so retrieval
658/// is access-controlled per connection.
659async fn ws_upgrade(
660 ws: WebSocketUpgrade,
661 State(state): State<AppState>,
662 Query(query): Query<WsQuery>,
663 headers: axum::http::HeaderMap,
664) -> Response {
665 let ConnectionAuth { access, org_id } = match resolve_ws_access(&state, &query) {
666 Ok(auth) => auth,
667 // Strict auth refused the connection (missing/invalid token).
668 Err(()) => return (axum::http::StatusCode::UNAUTHORIZED, "unauthorized").into_response(),
669 };
670 // Capture the browser's `Origin` at the handshake (browsers always send it,
671 // and can't be made to forge another site's). It's enforced per-agent at
672 // session creation against the agent's embed allowlist (widget_auth).
673 let origin = headers
674 .get(axum::http::header::ORIGIN)
675 .and_then(|v| v.to_str().ok())
676 .map(str::to_string);
677 ws.on_upgrade(move |socket| connection_loop(socket, state, access, org_id, origin))
678}
679
680/// Drive one WebSocket connection: split into reader + writer, joined by an
681/// outbound event sink. `access` is the connection's resolved document-level
682/// entitlement, threaded into every `send_message` turn. `auth_org` is the
683/// authenticated JWT principal's org (when the token verified), used to create
684/// sessions under the authenticated org. `origin` is the handshake `Origin`
685/// header, enforced against an agent's embed allowlist.
686async fn connection_loop(
687 socket: WebSocket,
688 state: AppState,
689 access: AccessContext,
690 auth_org: Option<String>,
691 origin: Option<String>,
692) {
693 let (mut ws_tx, mut ws_rx) = socket.split();
694 let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
695
696 // Register this connection's outbound sink with the backplane so events
697 // published from anywhere (this pod or, with a Redis/NATS impl, another) can
698 // reach it. `conn_id` is associated with its session at create-session time.
699 let conn_id = uuid::Uuid::new_v4().to_string();
700 let sink_for_backplane = sink_tx.clone();
701 state
702 .backplane
703 .attach(
704 &conn_id,
705 std::sync::Arc::new(move |event| {
706 let _ = sink_for_backplane.send(event);
707 }),
708 )
709 .await;
710
711 // Writer: drain the sink and write each event as a JSON text frame.
712 let writer = tokio::spawn(async move {
713 while let Some(event) = sink_rx.recv().await {
714 let text = match serde_json::to_string(&event) {
715 Ok(t) => t,
716 Err(_) => continue,
717 };
718 if ws_tx.send(Message::Text(text.into())).await.is_err() {
719 break;
720 }
721 }
722 });
723
724 // Reader: dispatch inbound frames. Handlers emit events via `sink_tx`.
725 //
726 // The `select!` lets a graceful shutdown (SIGTERM/ctrl_c → `state.shutdown`
727 // cancelled by the serve loop) break this loop so the connection drains: it
728 // stops reading new frames, falls out, and detaches below. `biased` so the
729 // shutdown branch wins a tie. Crucially, `handle_frame(...).await` stays
730 // INSIDE the frame arm (not a `select!` condition), so a turn already in
731 // flight when the cancel fires runs to completion before the next loop
732 // iteration observes the cancellation — that is the in-flight drain.
733 loop {
734 tokio::select! {
735 biased;
736
737 () = state.shutdown.cancelled() => {
738 // Pod is terminating: stop accepting frames on this connection.
739 // Returning closes the socket (the writer task ends when
740 // `sink_tx` drops below); any turn that was mid-flight already
741 // finished in the frame arm before we got here.
742 break;
743 }
744
745 frame = ws_rx.next() => {
746 match frame {
747 Some(Ok(Message::Text(text))) => {
748 handler::handle_frame(
749 &state,
750 &access,
751 &conn_id,
752 origin.as_deref(),
753 auth_org.as_deref(),
754 text.as_str(),
755 &sink_tx,
756 )
757 .await;
758 }
759 Some(Ok(Message::Binary(_))) => {
760 let _ = sink_tx.send(crate::protocol::error(
761 None,
762 "VALIDATION_ERROR",
763 "binary frames are not supported; send JSON text frames",
764 ));
765 }
766 Some(Ok(Message::Close(_))) => break,
767 // Ping/Pong control frames are handled by axum automatically.
768 Some(Ok(_)) => {}
769 Some(Err(_)) => break,
770 // Stream ended (peer hung up).
771 None => break,
772 }
773 }
774 }
775 }
776
777 // Reader finished (peer closed, error, or graceful shutdown) → detach from
778 // the backplane so no stale registry entry is left behind, then drop the
779 // sink so the writer task exits.
780 state.backplane.detach(&conn_id).await;
781 drop(sink_tx);
782 let _ = writer.await;
783}
784
785#[cfg(test)]
786mod tests {
787 use super::*;
788 use smooth_operator::adapter::StorageAdapter;
789
790 #[test]
791 fn seeded_kb_returns_17_day_fact() {
792 let storage = InMemoryStorageAdapter::new();
793 seed_knowledge(&storage);
794 let results = storage
795 .knowledge()
796 .query("return window policy", 3)
797 .expect("query");
798 assert!(
799 results.iter().any(|r| r.chunk.contains("17")),
800 "expected seeded 17-day fact, got: {results:?}"
801 );
802 }
803
804 #[tokio::test]
805 async fn build_state_without_key_has_no_llm() {
806 let cfg = ServerConfig {
807 bind: "127.0.0.1".into(),
808 port: 0,
809 gateway_url: "https://example.test/v1".into(),
810 gateway_key: None,
811 model: "m".into(),
812 seed_kb: true,
813 max_iterations: 4,
814 max_tokens: 128,
815 storage: crate::config::StorageBackend::Memory,
816 widget_auth_strict: false,
817 confirm_tools: Vec::new(),
818 judge_model: "claude-haiku-4-5".to_string(),
819 };
820 let state = build_state(cfg);
821 assert!(!state.config.has_llm());
822 }
823}