smooth_operator_server/server.rs
1//! The axum WebSocket server: one `/ws` endpoint, one task per connection.
2//!
3//! Per connection we split the socket and run two tasks joined by an
4//! `UnboundedSender<serde_json::Value>` outbound sink:
5//!
6//! - a **writer** that drains the sink and writes each event as a JSON text
7//! frame, and
8//! - a **reader** that reads inbound frames and dispatches them via
9//! [`crate::handler::handle_frame`], passing the sink so handlers (including
10//! the streaming `send_message`) can emit events as they happen.
11//!
12//! Using a sink channel (instead of writing directly from the handler) is what
13//! lets a streaming turn fire many `stream_token` events from inside the agent
14//! loop while the connection is still reading.
15
16use std::net::SocketAddr;
17use std::sync::Arc;
18
19use anyhow::{Context, Result};
20use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
21use axum::extract::{Query, State};
22use axum::response::Response;
23use axum::routing::get;
24use axum::Router;
25
26use futures_util::{SinkExt, StreamExt};
27use smooth_operator::access_control::AccessContext;
28use tokio::net::TcpListener;
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_adapter_memory::InMemoryStorageAdapter;
32use smooth_operator_core::{Document, DocumentType};
33
34use crate::config::ServerConfig;
35use crate::handler;
36use crate::state::AppState;
37
38/// Build the axum [`Router`] for the given application state. Exposed so tests
39/// can boot the server in-process. Serves the WebSocket `/ws` endpoint plus the
40/// auth-gated admin HTTP API under `/admin` (see [`crate::admin`]).
41pub fn router(state: AppState) -> Router {
42 Router::new()
43 .route("/ws", get(ws_upgrade))
44 // Unauthenticated liveness/readiness probe. A WebSocket `/ws` upgrade is
45 // not a plain GET, so HTTP load balancers (AWS ALB / nginx ingress) need a
46 // real HTTP route that answers 200 to confirm the listener is up. Cheap
47 // and dependency-free — it does not touch storage/LLM, so it stays Ready
48 // even when an optional backend (gateway key, DB) is degraded.
49 .route("/health", get(health))
50 .merge(crate::admin::router())
51 .with_state(state)
52}
53
54/// `GET /health` → `200 OK`. The minimal HTTP health endpoint for container
55/// orchestrators and HTTP load balancers (the WS `/ws` route can't serve a plain
56/// GET healthcheck).
57async fn health() -> &'static str {
58 "ok"
59}
60
61/// The document set the seeded demo docs are tagged into, so
62/// `GET /admin/document-sets` has something to report in a seeded server.
63const SEED_DOCUMENT_SET: &str = "policies";
64
65/// The org the seeded demo docs + their document-set registry entries belong to.
66/// Mirrors the org `handler::handle_create_session` stamps onto reference
67/// conversations, so the seeded sets show up for the reference org's admin
68/// caller (and ONLY that org — cross-org scoping).
69pub const SEED_ORG_ID: &str = "reference-org";
70
71/// Build an [`AppState`] over a fresh in-memory adapter, seeding the knowledge
72/// base when `config.seed_kb` is set. Exposed for tests + the binary.
73///
74/// The auth verifier defaults to [`NoAuthVerifier`](smooth_operator::auth::NoAuthVerifier)
75/// here (the protocol-only path needs no auth); the **binary** path
76/// ([`build_state_from_env`]) installs the env-configured, secure-by-default
77/// verifier instead.
78#[must_use]
79pub fn build_state(config: ServerConfig) -> AppState {
80 let seed = config.seed_kb;
81 let storage = Arc::new(InMemoryStorageAdapter::new());
82 let state = AppState::new(storage.clone(), config);
83 if seed {
84 seed_knowledge(storage.as_ref());
85 // Record the seeded docs' document-set membership for the admin API
86 // (the in-memory backend drops document metadata, so the registry is the
87 // source of truth for set names + counts).
88 state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
89 state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
90 }
91 state
92}
93
94/// Build an [`AppState`] with the **env-configured** auth verifier (secure by
95/// default — see [`smooth_operator::auth::AuthConfig`]). Used by the binary.
96///
97/// # Errors
98/// Returns an error if the auth configuration is invalid (e.g. `AUTH_MODE=jwt`
99/// with no key) — the server refuses to start rather than fall back to no-auth.
100pub fn build_state_from_env(config: ServerConfig) -> Result<AppState> {
101 let verifier = smooth_operator::auth::AuthConfig::from_env()
102 .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
103 let state = install_widget_auth_from_env(build_state(config));
104 Ok(state.with_auth(Arc::from(verifier)))
105}
106
107/// Install an [`HttpWidgetAuth`](smooth_operator::widget_auth::HttpWidgetAuth)
108/// provider from `WIDGET_AUTH_URL` (optionally `WIDGET_AUTH_BEARER` +
109/// `WIDGET_AUTH_TTL_SECS`); otherwise leave the permissive default. This lets a
110/// host enforce embeddable-widget auth against its own policy service by setting
111/// env vars — no custom binary needed. (A host wanting bespoke logic still
112/// installs its own provider via [`AppState::with_widget_auth`].)
113fn install_widget_auth_from_env(state: AppState) -> AppState {
114 let Ok(url) = std::env::var("WIDGET_AUTH_URL") else {
115 return state;
116 };
117 let url = url.trim();
118 if url.is_empty() {
119 return state;
120 }
121 let mut provider = smooth_operator::widget_auth::HttpWidgetAuth::new(url);
122 if let Ok(bearer) = std::env::var("WIDGET_AUTH_BEARER") {
123 let bearer = bearer.trim();
124 if !bearer.is_empty() {
125 provider = provider.with_bearer(bearer);
126 }
127 }
128 if let Some(secs) = std::env::var("WIDGET_AUTH_TTL_SECS")
129 .ok()
130 .and_then(|s| s.trim().parse::<u64>().ok())
131 {
132 provider = provider.with_ttl(std::time::Duration::from_secs(secs));
133 }
134 state.with_widget_auth(Arc::new(provider))
135}
136
137/// Build an [`AppState`] selecting the **storage backend** (and the matching
138/// durable **admin stores**) from `config.storage`, then installing the
139/// env-configured auth verifier.
140///
141/// - [`StorageBackend::Memory`](crate::config::StorageBackend::Memory) — the
142/// in-memory adapter + in-memory admin stores (the [`build_state`] path; lost
143/// on restart). The default.
144/// - [`StorageBackend::Postgres`](crate::config::StorageBackend::Postgres) —
145/// the Postgres + pgvector adapter; the admin stores persist to the **same
146/// database** (`connector_configs` / `agent_settings` / `indexing_runs`).
147/// Connection string from `SMOOTH_AGENT_DATABASE_URL` / `DATABASE_URL`.
148/// - [`StorageBackend::Dynamodb`](crate::config::StorageBackend::Dynamodb) — the
149/// DynamoDB single-table adapter; the admin stores persist to the **same
150/// table**. Table from `SMOOTH_AGENT_DDB_TABLE`; the table is created if
151/// absent.
152///
153/// The admin store backend always matches the storage backend so a connector
154/// config / settings / indexing run survives a restart wherever the
155/// conversations and knowledge live.
156///
157/// # Errors
158/// Returns an error if the auth configuration is invalid, or if the selected
159/// persistent backend fails to connect / migrate.
160pub async fn build_state_from_env_async(config: ServerConfig) -> Result<AppState> {
161 use crate::config::StorageBackend;
162 // Only the Postgres / DynamoDB arms name `StorageAdapter` (for the
163 // `Arc<dyn StorageAdapter>` annotation); on a lean build with neither feature
164 // those arms are compiled out, so the import would be unused.
165 #[cfg(any(feature = "postgres", feature = "dynamodb"))]
166 use smooth_operator::adapter::StorageAdapter;
167
168 let verifier = smooth_operator::auth::AuthConfig::from_env()
169 .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
170
171 let state = match config.storage {
172 // The in-memory path is unchanged (synchronous, no external services).
173 StorageBackend::Memory => build_state(config),
174
175 // The Postgres storage backend (and its matching durable admin stores)
176 // is only compiled in on a build with the `postgres` feature (the default
177 // / cloud build). A lean `--no-default-features` build returns a clear
178 // error if `SMOOTH_AGENT_STORAGE=postgres` is requested at runtime.
179 #[cfg(feature = "postgres")]
180 StorageBackend::Postgres => {
181 use smooth_operator_adapter_postgres::PostgresAdapter;
182 // The pgvector column width MUST match the embedder the `/index`
183 // path uses (1536 keyed / 1024 offline). Build the embedder from
184 // config and create the adapter with it so document vectors (at
185 // ingest) and query vectors agree — no silent 1024/1536 mismatch.
186 let embedder = crate::embedder::build_embedder(
187 &crate::embedder::EmbedderConfig::from_server_config(&config),
188 );
189 let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
190 .or_else(|_| std::env::var("DATABASE_URL"))
191 .map_err(|_| {
192 anyhow::anyhow!(
193 "Postgres backend selected but neither SMOOTH_AGENT_DATABASE_URL \
194 nor DATABASE_URL is set"
195 )
196 })?;
197 let adapter = Arc::new(
198 PostgresAdapter::connect_with_embedder(&conn_str, embedder)
199 .await
200 .map_err(|e| anyhow::anyhow!("connecting Postgres storage backend: {e}"))?,
201 );
202 // Admin stores against the SAME database — durable.
203 let connectors = Arc::new(adapter.connector_config_store());
204 let settings = Arc::new(adapter.settings_store());
205 let indexing = Arc::new(adapter.indexing_store());
206 let storage: Arc<dyn StorageAdapter> = adapter;
207 AppState::new(storage, config)
208 .with_connector_configs(connectors)
209 .with_settings(settings)
210 .with_indexing(indexing)
211 }
212
213 // The DynamoDB storage backend is only compiled in on a build with the
214 // `dynamodb` feature (the default / cloud build). A lean build returns a
215 // clear error if `SMOOTH_AGENT_STORAGE=dynamodb` is requested at runtime.
216 #[cfg(feature = "dynamodb")]
217 StorageBackend::Dynamodb => {
218 use smooth_operator_adapter_dynamodb::DynamoDbAdapter;
219 let adapter = Arc::new(
220 DynamoDbAdapter::from_env(None)
221 .await
222 .map_err(|e| anyhow::anyhow!("connecting DynamoDB storage backend: {e}"))?,
223 );
224 adapter
225 .create_table()
226 .await
227 .map_err(|e| anyhow::anyhow!("creating DynamoDB table: {e}"))?;
228 // Admin stores against the SAME table — durable.
229 let connectors = Arc::new(adapter.connector_config_store());
230 let settings = Arc::new(adapter.settings_store());
231 let indexing = Arc::new(adapter.indexing_store());
232 let storage: Arc<dyn StorageAdapter> = adapter;
233 AppState::new(storage, config)
234 .with_connector_configs(connectors)
235 .with_settings(settings)
236 .with_indexing(indexing)
237 }
238
239 // Lean build: a persistent backend was requested but its feature wasn't
240 // compiled in. Fail loud with an actionable message rather than silently
241 // running in-memory (which would lose data on restart).
242 #[cfg(not(feature = "postgres"))]
243 StorageBackend::Postgres => {
244 anyhow::bail!(
245 "SMOOTH_AGENT_STORAGE=postgres requires building with --features postgres \
246 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
247 with the 'cloud'/'postgres' feature"
248 );
249 }
250 #[cfg(not(feature = "dynamodb"))]
251 StorageBackend::Dynamodb => {
252 anyhow::bail!(
253 "SMOOTH_AGENT_STORAGE=dynamodb requires building with --features dynamodb \
254 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
255 with the 'cloud'/'dynamodb' feature"
256 );
257 }
258 };
259
260 let state = install_backplane_from_env(state).await?;
261 let state = install_widget_auth_from_env(state);
262
263 Ok(state.with_auth(Arc::from(verifier)))
264}
265
266/// Select the connection [`Backplane`](smooth_operator::backplane::Backplane)
267/// from `SMOOTH_AGENT_BACKPLANE`, installing it via
268/// [`AppState::with_backplane`](crate::state::AppState::with_backplane).
269///
270/// | value | backend | url env |
271/// |---|---|---|
272/// | unset / `memory` / `inmemory` | single-process (default) | — |
273/// | `redis` / `valkey` | [`RedisBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_REDIS_URL` |
274/// | `nats` | [`NatsBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_NATS_URL` |
275///
276/// A distributed backend is required for >1 replica (otherwise an event produced
277/// on one pod can't reach a socket on another) and to let non-AI publishers push
278/// realtime events via `Backplane::publish`.
279///
280/// # Errors
281/// Returns an error for an unknown backend value, a missing url, or a failed
282/// connection — fail loud at boot rather than silently run single-process.
283async fn install_backplane_from_env(state: AppState) -> Result<AppState> {
284 let kind = std::env::var("SMOOTH_AGENT_BACKPLANE")
285 .unwrap_or_default()
286 .trim()
287 .to_lowercase();
288
289 let url = |specific: &str| -> Result<String> {
290 std::env::var("SMOOTH_AGENT_BACKPLANE_URL")
291 .or_else(|_| std::env::var(specific))
292 .map_err(|_| {
293 anyhow::anyhow!(
294 "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set"
295 )
296 })
297 };
298
299 match kind.as_str() {
300 "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed
301 // The Redis backplane is only compiled in on a build with the `redis`
302 // feature (the default / cloud build). A lean `--no-default-features`
303 // build returns a clear error rather than silently running single-process.
304 "redis" | "valkey" => {
305 #[cfg(feature = "redis")]
306 {
307 use smooth_operator_adapter_backplane_redis::RedisBackplane;
308 let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?)
309 .await
310 .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?;
311 Ok(state.with_backplane(Arc::new(backplane)))
312 }
313 #[cfg(not(feature = "redis"))]
314 {
315 let _ = url; // silence unused-closure warning on the lean build
316 anyhow::bail!(
317 "SMOOTH_AGENT_BACKPLANE={kind} requires building with --features redis \
318 (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
319 with the 'cloud'/'redis' feature"
320 )
321 }
322 }
323 // The NATS backplane is only compiled in on a build with the `nats`
324 // feature (the default / cloud build). A lean build returns a clear error.
325 "nats" => {
326 #[cfg(feature = "nats")]
327 {
328 use smooth_operator_adapter_backplane_nats::NatsBackplane;
329 let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?)
330 .await
331 .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?;
332 Ok(state.with_backplane(Arc::new(backplane)))
333 }
334 #[cfg(not(feature = "nats"))]
335 {
336 let _ = url; // silence unused-closure warning on the lean build
337 anyhow::bail!(
338 "SMOOTH_AGENT_BACKPLANE=nats requires building with --features nats \
339 (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
340 with the 'cloud'/'nats' feature"
341 )
342 }
343 }
344 other => Err(anyhow::anyhow!(
345 "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)"
346 )),
347 }
348}
349
350/// Seed a couple of distinctive demo docs so knowledge-grounded E2E is
351/// deterministic. The 17-day return window is deliberately unusual so an
352/// ungrounded answer can't accidentally match it. Both docs are tagged into the
353/// `policies` document set so the admin API can report it.
354pub fn seed_knowledge(storage: &InMemoryStorageAdapter) {
355 let kb = smooth_operator::adapter::StorageAdapter::knowledge(storage);
356 let _ = kb.ingest(smooth_operator::with_document_set(
357 Document::new(
358 "SmooAI's return window is exactly 17 days from delivery. Returns after 17 days are not accepted.",
359 "policies/returns.md",
360 DocumentType::Documentation,
361 ),
362 [SEED_DOCUMENT_SET],
363 ));
364 let _ = kb.ingest(smooth_operator::with_document_set(
365 Document::new(
366 "SmooAI standard shipping takes 5 to 7 business days. Expedited shipping takes 2 business days.",
367 "policies/shipping.md",
368 DocumentType::Documentation,
369 ),
370 [SEED_DOCUMENT_SET],
371 ));
372}
373
374/// Bind on `<SMOOTH_AGENT_BIND>:<port>` (default loopback) and serve until the
375/// process is killed. Returns the bound [`TcpListener`] + the router, used by
376/// both the binary and tests (tests bind port 0 for an ephemeral port).
377///
378/// Uses the **env-configured, secure-by-default** auth verifier
379/// ([`build_state_from_env`]) — the binary refuses to start if auth is
380/// misconfigured rather than silently serving the admin API unauthenticated.
381///
382/// # Errors
383/// Returns an error if the auth configuration is invalid or the TCP bind fails.
384pub async fn bind(config: ServerConfig) -> Result<(TcpListener, Router, CancellationToken)> {
385 let ip: std::net::IpAddr = config
386 .bind
387 .parse()
388 .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
389 let addr = SocketAddr::new(ip, config.port);
390 // Async so a Postgres / DynamoDB storage backend (and its matching durable
391 // admin stores) can be wired; in-memory stays synchronous inside.
392 let state = build_state_from_env_async(config).await?;
393 // Clone the shutdown token BEFORE the state is consumed into the router, so
394 // `run` can cancel it (which fans out to every per-connection clone) when a
395 // SIGTERM/ctrl_c arrives.
396 let shutdown = state.shutdown.clone();
397 let app = router(state);
398 let listener = TcpListener::bind(addr)
399 .await
400 .with_context(|| format!("binding WebSocket server on {addr}"))?;
401 Ok((listener, app, shutdown))
402}
403
404/// Serve a **pre-built** [`AppState`] to completion (blocks), binding on
405/// `state.config.bind:state.config.port`.
406///
407/// This is the library entry point for callers that assemble their own
408/// `AppState` — e.g. the `dev-support` example, which ingests a GitHub repo into
409/// a storage adapter, wires the env-configured [`AuthVerifier`], and then serves
410/// that exact state so the chat-widget queries the ingested knowledge. It does
411/// **not** rebuild the state or touch the ACL/auth/embedder/reranker selection —
412/// those are baked into the `state` the caller passes in. The WS loop, router,
413/// and listening log are identical to [`run`] (which builds its state from env);
414/// `run` is unchanged.
415///
416/// # Errors
417/// Returns an error if the TCP bind fails or serving fails.
418pub async fn serve_state(state: AppState) -> Result<()> {
419 let ip: std::net::IpAddr = state
420 .config
421 .bind
422 .parse()
423 .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
424 let addr = SocketAddr::new(ip, state.config.port);
425 let listener = TcpListener::bind(addr)
426 .await
427 .with_context(|| format!("binding WebSocket server on {addr}"))?;
428 serve_state_on(state, listener).await
429}
430
431/// Serve a pre-built [`AppState`] on an already-bound [`TcpListener`] (blocks).
432///
433/// Splitting the bind from the serve lets a caller (or a test) bind an ephemeral
434/// port, read [`TcpListener::local_addr`] for the real port, then hand the
435/// listener here. Logs the same "listening" line [`run`] does.
436///
437/// # Errors
438/// Returns an error if serving fails.
439pub async fn serve_state_on(state: AppState, listener: TcpListener) -> Result<()> {
440 let has_llm = state.config.has_llm();
441 let model = state.config.model.clone();
442 let gateway = state.config.gateway_url.clone();
443 let local = listener.local_addr().context("local addr")?;
444 let app = router(state);
445
446 tracing::info!(
447 %local,
448 endpoint = "/ws",
449 %model,
450 %gateway,
451 llm_enabled = has_llm,
452 "smooth-operator-server listening"
453 );
454 println!(
455 "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
456 );
457
458 axum::serve(listener, app)
459 .await
460 .context("serving WebSocket connections")?;
461 Ok(())
462}
463
464/// Run the server to completion (blocks). Logs a single listening line.
465///
466/// # Errors
467/// Returns an error if binding or serving fails.
468pub async fn run(config: ServerConfig) -> Result<()> {
469 let has_llm = config.has_llm();
470 let model = config.model.clone();
471 let gateway = config.gateway_url.clone();
472 let (listener, app, shutdown) = bind(config).await?;
473 let local = listener.local_addr().context("local addr")?;
474
475 tracing::info!(
476 %local,
477 endpoint = "/ws",
478 %model,
479 %gateway,
480 llm_enabled = has_llm,
481 "smooth-operator-server listening"
482 );
483 // Also print to stdout so the run-confirmation check is unambiguous without
484 // a tracing subscriber filter.
485 println!(
486 "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
487 );
488
489 // Graceful drain: stop accepting new connections AND cancel the shared
490 // shutdown token on SIGTERM (k8s pod termination) / ctrl_c. Cancelling fans
491 // out to every per-connection reader loop so each finishes its in-flight turn
492 // and detaches from the backplane before the process exits — within the
493 // chart's `terminationGracePeriodSeconds` window.
494 axum::serve(listener, app)
495 .with_graceful_shutdown(async move {
496 wait_for_shutdown_signal().await;
497 tracing::info!("shutdown signal received; draining in-flight WebSocket turns");
498 shutdown.cancel();
499 })
500 .await
501 .context("serving WebSocket connections")?;
502 Ok(())
503}
504
505/// Resolve when the process receives a termination request: SIGTERM (how
506/// Kubernetes asks a pod to stop on scale-down / rollout) **or** ctrl_c
507/// (SIGINT — interactive `cargo run`), whichever comes first.
508///
509/// Unix-only signal handling (the server targets Linux/k8s); on a non-unix host
510/// it falls back to ctrl_c alone so the binary still stops cleanly.
511async fn wait_for_shutdown_signal() {
512 #[cfg(unix)]
513 {
514 use tokio::signal::unix::{signal, SignalKind};
515 // If installing the SIGTERM handler somehow fails, fall back to ctrl_c
516 // only rather than panicking the serve task.
517 let mut sigterm = match signal(SignalKind::terminate()) {
518 Ok(s) => s,
519 Err(e) => {
520 tracing::warn!(error = %e, "failed to install SIGTERM handler; ctrl_c only");
521 let _ = tokio::signal::ctrl_c().await;
522 return;
523 }
524 };
525 tokio::select! {
526 _ = sigterm.recv() => {}
527 _ = tokio::signal::ctrl_c() => {}
528 }
529 }
530 #[cfg(not(unix))]
531 {
532 let _ = tokio::signal::ctrl_c().await;
533 }
534}
535
536/// Query parameters accepted on the `/ws` upgrade. `token` carries the bearer
537/// JWT used to authenticate the connection (browsers can't set custom headers on
538/// a WebSocket handshake, so the token rides on the query string — the standard
539/// pattern for WS auth).
540#[derive(Debug, serde::Deserialize, Default)]
541struct WsQuery {
542 /// The bearer token (raw JWT, no `Bearer ` prefix), if provided.
543 #[serde(default)]
544 token: Option<String>,
545}
546
547/// Resolve the connection's [`AccessContext`] from the `?token=` query param.
548///
549/// **Fail closed for ACL'd content**: when no token is presented, or the auth
550/// verifier is the no-key [`AdminDisabledVerifier`] (admin/auth unconfigured —
551/// dev/no-auth), or the token fails to verify, the connection runs as
552/// [`AccessContext::anonymous`] — which sees **only org-public** knowledge, not
553/// every document. A valid token yields the principal's full
554/// [`AccessContext`](smooth_operator::auth::Principal::access_context) (user id +
555/// groups), so it can read documents scoped to it. Verification failures are
556/// logged (never the token) and degrade to anonymous rather than dropping the
557/// connection, so the dev/no-auth case still serves org-public knowledge.
558fn resolve_ws_access(state: &AppState, query: &WsQuery) -> AccessContext {
559 let Some(token) = query
560 .token
561 .as_deref()
562 .map(str::trim)
563 .filter(|t| !t.is_empty())
564 else {
565 // No token → anonymous (org-public only). Keeps the dev/no-auth `/ws`
566 // path working while failing closed for ACL'd content.
567 return AccessContext::anonymous();
568 };
569 match state.auth.verify(token) {
570 Ok(principal) => principal.access_context(),
571 Err(e) => {
572 // Don't leak the token; log only the mode + a generic reason.
573 tracing::warn!(
574 auth_mode = state.auth.mode(),
575 error = %e,
576 "ws token failed verification; serving org-public knowledge only (anonymous)"
577 );
578 AccessContext::anonymous()
579 }
580 }
581}
582
583/// Axum handler: upgrade an HTTP request on `/ws` to a WebSocket. The bearer
584/// token (if any) is taken from the `?token=` query param, resolved to an
585/// [`AccessContext`] at connect time, and threaded into every turn so retrieval
586/// is access-controlled per connection.
587async fn ws_upgrade(
588 ws: WebSocketUpgrade,
589 State(state): State<AppState>,
590 Query(query): Query<WsQuery>,
591 headers: axum::http::HeaderMap,
592) -> Response {
593 let access = resolve_ws_access(&state, &query);
594 // Capture the browser's `Origin` at the handshake (browsers always send it,
595 // and can't be made to forge another site's). It's enforced per-agent at
596 // session creation against the agent's embed allowlist (widget_auth).
597 let origin = headers
598 .get(axum::http::header::ORIGIN)
599 .and_then(|v| v.to_str().ok())
600 .map(str::to_string);
601 ws.on_upgrade(move |socket| connection_loop(socket, state, access, origin))
602}
603
604/// Drive one WebSocket connection: split into reader + writer, joined by an
605/// outbound event sink. `access` is the connection's resolved document-level
606/// entitlement, threaded into every `send_message` turn. `origin` is the
607/// handshake `Origin` header, enforced against an agent's embed allowlist.
608async fn connection_loop(
609 socket: WebSocket,
610 state: AppState,
611 access: AccessContext,
612 origin: Option<String>,
613) {
614 let (mut ws_tx, mut ws_rx) = socket.split();
615 let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
616
617 // Register this connection's outbound sink with the backplane so events
618 // published from anywhere (this pod or, with a Redis/NATS impl, another) can
619 // reach it. `conn_id` is associated with its session at create-session time.
620 let conn_id = uuid::Uuid::new_v4().to_string();
621 let sink_for_backplane = sink_tx.clone();
622 state
623 .backplane
624 .attach(
625 &conn_id,
626 std::sync::Arc::new(move |event| {
627 let _ = sink_for_backplane.send(event);
628 }),
629 )
630 .await;
631
632 // Writer: drain the sink and write each event as a JSON text frame.
633 let writer = tokio::spawn(async move {
634 while let Some(event) = sink_rx.recv().await {
635 let text = match serde_json::to_string(&event) {
636 Ok(t) => t,
637 Err(_) => continue,
638 };
639 if ws_tx.send(Message::Text(text.into())).await.is_err() {
640 break;
641 }
642 }
643 });
644
645 // Reader: dispatch inbound frames. Handlers emit events via `sink_tx`.
646 //
647 // The `select!` lets a graceful shutdown (SIGTERM/ctrl_c → `state.shutdown`
648 // cancelled by the serve loop) break this loop so the connection drains: it
649 // stops reading new frames, falls out, and detaches below. `biased` so the
650 // shutdown branch wins a tie. Crucially, `handle_frame(...).await` stays
651 // INSIDE the frame arm (not a `select!` condition), so a turn already in
652 // flight when the cancel fires runs to completion before the next loop
653 // iteration observes the cancellation — that is the in-flight drain.
654 loop {
655 tokio::select! {
656 biased;
657
658 () = state.shutdown.cancelled() => {
659 // Pod is terminating: stop accepting frames on this connection.
660 // Returning closes the socket (the writer task ends when
661 // `sink_tx` drops below); any turn that was mid-flight already
662 // finished in the frame arm before we got here.
663 break;
664 }
665
666 frame = ws_rx.next() => {
667 match frame {
668 Some(Ok(Message::Text(text))) => {
669 handler::handle_frame(
670 &state,
671 &access,
672 &conn_id,
673 origin.as_deref(),
674 text.as_str(),
675 &sink_tx,
676 )
677 .await;
678 }
679 Some(Ok(Message::Binary(_))) => {
680 let _ = sink_tx.send(crate::protocol::error(
681 None,
682 "VALIDATION_ERROR",
683 "binary frames are not supported; send JSON text frames",
684 ));
685 }
686 Some(Ok(Message::Close(_))) => break,
687 // Ping/Pong control frames are handled by axum automatically.
688 Some(Ok(_)) => {}
689 Some(Err(_)) => break,
690 // Stream ended (peer hung up).
691 None => break,
692 }
693 }
694 }
695 }
696
697 // Reader finished (peer closed, error, or graceful shutdown) → detach from
698 // the backplane so no stale registry entry is left behind, then drop the
699 // sink so the writer task exits.
700 state.backplane.detach(&conn_id).await;
701 drop(sink_tx);
702 let _ = writer.await;
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708 use smooth_operator::adapter::StorageAdapter;
709
710 #[test]
711 fn seeded_kb_returns_17_day_fact() {
712 let storage = InMemoryStorageAdapter::new();
713 seed_knowledge(&storage);
714 let results = storage
715 .knowledge()
716 .query("return window policy", 3)
717 .expect("query");
718 assert!(
719 results.iter().any(|r| r.chunk.contains("17")),
720 "expected seeded 17-day fact, got: {results:?}"
721 );
722 }
723
724 #[tokio::test]
725 async fn build_state_without_key_has_no_llm() {
726 let cfg = ServerConfig {
727 bind: "127.0.0.1".into(),
728 port: 0,
729 gateway_url: "https://example.test/v1".into(),
730 gateway_key: None,
731 model: "m".into(),
732 seed_kb: true,
733 max_iterations: 4,
734 max_tokens: 128,
735 storage: crate::config::StorageBackend::Memory,
736 widget_auth_strict: false,
737 confirm_tools: Vec::new(),
738 };
739 let state = build_state(cfg);
740 assert!(!state.config.has_llm());
741 }
742}