smooth_operator_server/server.rs
1//! The axum WebSocket server: one `/ws` endpoint, one task per connection.
2//!
3//! Per connection we split the socket and run two tasks joined by an
4//! `UnboundedSender<serde_json::Value>` outbound sink:
5//!
6//! - a **writer** that drains the sink and writes each event as a JSON text
7//! frame, and
8//! - a **reader** that reads inbound frames and dispatches them via
9//! [`crate::handler::handle_frame`], passing the sink so handlers (including
10//! the streaming `send_message`) can emit events as they happen.
11//!
12//! Using a sink channel (instead of writing directly from the handler) is what
13//! lets a streaming turn fire many `stream_token` events from inside the agent
14//! loop while the connection is still reading.
15
16use std::net::SocketAddr;
17use std::sync::Arc;
18
19use anyhow::{Context, Result};
20use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
21use axum::extract::{Query, State};
22use axum::response::{Html, IntoResponse, Response};
23use axum::routing::get;
24use axum::Router;
25
26use futures_util::{SinkExt, StreamExt};
27use smooth_operator::access_control::AccessContext;
28use tokio::net::TcpListener;
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_adapter_memory::InMemoryStorageAdapter;
32use smooth_operator_core::{Document, DocumentType};
33
34use crate::config::ServerConfig;
35use crate::handler;
36use crate::state::AppState;
37
38/// Build the axum [`Router`] for the given application state. Exposed so tests
39/// can boot the server in-process. Serves the WebSocket `/ws` endpoint plus the
40/// auth-gated admin HTTP API under `/admin` (see [`crate::admin`]).
41pub fn router(state: AppState) -> Router {
42 let mut router = Router::new()
43 .route("/ws", get(ws_upgrade))
44 // Unauthenticated liveness/readiness probe. A WebSocket `/ws` upgrade is
45 // not a plain GET, so HTTP load balancers (AWS ALB / nginx ingress) need a
46 // real HTTP route that answers 200 to confirm the listener is up. Cheap
47 // and dependency-free — it does not touch storage/LLM, so it stays Ready
48 // even when an optional backend (gateway key, DB) is degraded.
49 .route("/health", get(health))
50 .merge(crate::admin::router());
51 // The local deployment flavor opts into serving the official widget; other
52 // flavors never mount these routes.
53 if state.serve_widget {
54 router = router
55 .route("/", get(widget_index))
56 .route("/chat-widget.iife.js", get(widget_bundle));
57 }
58 router.with_state(state)
59}
60
61/// Serve the local-flavor widget host page at `/`, injecting the auth token
62/// (same-origin, loopback) so the embedded `<smooth-agent-chat>` connects to
63/// this server's own `/ws?token=…`. The token is JSON-encoded into the page so
64/// quoting is always safe.
65async fn widget_index(State(state): State<AppState>) -> Html<String> {
66 let token_json = serde_json::to_string(state.widget_token.as_deref().unwrap_or(""))
67 .unwrap_or_else(|_| "\"\"".to_string());
68 let html =
69 include_str!("../assets/widget-index.html").replace("__SMOOTH_LOCAL_TOKEN__", &token_json);
70 Html(html)
71}
72
73/// Serve the vendored official widget IIFE bundle.
74async fn widget_bundle() -> impl IntoResponse {
75 (
76 [(
77 axum::http::header::CONTENT_TYPE,
78 "application/javascript; charset=utf-8",
79 )],
80 include_str!("../assets/chat-widget.iife.js"),
81 )
82}
83
84/// `GET /health` → `200 OK`. The minimal HTTP health endpoint for container
85/// orchestrators and HTTP load balancers (the WS `/ws` route can't serve a plain
86/// GET healthcheck).
87async fn health() -> &'static str {
88 "ok"
89}
90
91/// The document set the seeded demo docs are tagged into, so
92/// `GET /admin/document-sets` has something to report in a seeded server.
93const SEED_DOCUMENT_SET: &str = "policies";
94
95/// The org the seeded demo docs + their document-set registry entries belong to.
96/// Mirrors the org `handler::handle_create_session` stamps onto reference
97/// conversations, so the seeded sets show up for the reference org's admin
98/// caller (and ONLY that org — cross-org scoping).
99pub const SEED_ORG_ID: &str = "reference-org";
100
101/// Build an [`AppState`] over a fresh in-memory adapter, seeding the knowledge
102/// base when `config.seed_kb` is set. Exposed for tests + the binary.
103///
104/// The auth verifier defaults to [`NoAuthVerifier`](smooth_operator::auth::NoAuthVerifier)
105/// here (the protocol-only path needs no auth); the **binary** path
106/// ([`build_state_from_env`]) installs the env-configured, secure-by-default
107/// verifier instead.
108#[must_use]
109pub fn build_state(config: ServerConfig) -> AppState {
110 let seed = config.seed_kb;
111 let storage = Arc::new(InMemoryStorageAdapter::new());
112 let state = AppState::new(storage.clone(), config);
113 if seed {
114 seed_knowledge(storage.as_ref());
115 // Record the seeded docs' document-set membership for the admin API
116 // (the in-memory backend drops document metadata, so the registry is the
117 // source of truth for set names + counts).
118 state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
119 state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
120 }
121 state
122}
123
124/// Build an [`AppState`] with the **env-configured** auth verifier (secure by
125/// default — see [`smooth_operator::auth::AuthConfig`]). Used by the binary.
126///
127/// # Errors
128/// Returns an error if the auth configuration is invalid (e.g. `AUTH_MODE=jwt`
129/// with no key) — the server refuses to start rather than fall back to no-auth.
130pub fn build_state_from_env(config: ServerConfig) -> Result<AppState> {
131 let verifier = smooth_operator::auth::AuthConfig::from_env()
132 .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
133 let state = install_widget_auth_from_env(build_state(config));
134 Ok(state.with_auth(Arc::from(verifier)))
135}
136
137/// Install an [`HttpWidgetAuth`](smooth_operator::widget_auth::HttpWidgetAuth)
138/// provider from `WIDGET_AUTH_URL` (optionally `WIDGET_AUTH_BEARER` +
139/// `WIDGET_AUTH_TTL_SECS`); otherwise leave the permissive default. This lets a
140/// host enforce embeddable-widget auth against its own policy service by setting
141/// env vars — no custom binary needed. (A host wanting bespoke logic still
142/// installs its own provider via [`AppState::with_widget_auth`].)
143fn install_widget_auth_from_env(state: AppState) -> AppState {
144 let Ok(url) = std::env::var("WIDGET_AUTH_URL") else {
145 return state;
146 };
147 let url = url.trim();
148 if url.is_empty() {
149 return state;
150 }
151 let mut provider = smooth_operator::widget_auth::HttpWidgetAuth::new(url);
152 if let Ok(bearer) = std::env::var("WIDGET_AUTH_BEARER") {
153 let bearer = bearer.trim();
154 if !bearer.is_empty() {
155 provider = provider.with_bearer(bearer);
156 }
157 }
158 if let Some(secs) = std::env::var("WIDGET_AUTH_TTL_SECS")
159 .ok()
160 .and_then(|s| s.trim().parse::<u64>().ok())
161 {
162 provider = provider.with_ttl(std::time::Duration::from_secs(secs));
163 }
164 state.with_widget_auth(Arc::new(provider))
165}
166
167/// Build an [`AppState`] selecting the **storage backend** (and the matching
168/// durable **admin stores**) from `config.storage`, then installing the
169/// env-configured auth verifier.
170///
171/// - [`StorageBackend::Memory`](crate::config::StorageBackend::Memory) — the
172/// in-memory adapter + in-memory admin stores (the [`build_state`] path; lost
173/// on restart). The default.
174/// - [`StorageBackend::Postgres`](crate::config::StorageBackend::Postgres) —
175/// the Postgres + pgvector adapter; the admin stores persist to the **same
176/// database** (`connector_configs` / `agent_settings` / `indexing_runs`).
177/// Connection string from `SMOOTH_AGENT_DATABASE_URL` / `DATABASE_URL`.
178/// - [`StorageBackend::Dynamodb`](crate::config::StorageBackend::Dynamodb) — the
179/// DynamoDB single-table adapter; the admin stores persist to the **same
180/// table**. Table from `SMOOTH_AGENT_DDB_TABLE`; the table is created if
181/// absent.
182///
183/// The admin store backend always matches the storage backend so a connector
184/// config / settings / indexing run survives a restart wherever the
185/// conversations and knowledge live.
186///
187/// # Errors
188/// Returns an error if the auth configuration is invalid, or if the selected
189/// persistent backend fails to connect / migrate.
190pub async fn build_state_from_env_async(config: ServerConfig) -> Result<AppState> {
191 use crate::config::StorageBackend;
192 // Only the Postgres / DynamoDB arms name `StorageAdapter` (for the
193 // `Arc<dyn StorageAdapter>` annotation); on a lean build with neither feature
194 // those arms are compiled out, so the import would be unused.
195 #[cfg(any(feature = "postgres", feature = "dynamodb"))]
196 use smooth_operator::adapter::StorageAdapter;
197
198 let verifier = smooth_operator::auth::AuthConfig::from_env()
199 .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
200
201 let state = match config.storage {
202 // The in-memory path is unchanged (synchronous, no external services).
203 StorageBackend::Memory => build_state(config),
204
205 // The Postgres storage backend (and its matching durable admin stores)
206 // is only compiled in on a build with the `postgres` feature (the default
207 // / cloud build). A lean `--no-default-features` build returns a clear
208 // error if `SMOOTH_AGENT_STORAGE=postgres` is requested at runtime.
209 #[cfg(feature = "postgres")]
210 StorageBackend::Postgres => {
211 use smooth_operator_adapter_postgres::PostgresAdapter;
212 // The pgvector column width MUST match the embedder the `/index`
213 // path uses (1536 keyed / 1024 offline). Build the embedder from
214 // config and create the adapter with it so document vectors (at
215 // ingest) and query vectors agree — no silent 1024/1536 mismatch.
216 let embedder = crate::embedder::build_embedder(
217 &crate::embedder::EmbedderConfig::from_server_config(&config),
218 );
219 let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
220 .or_else(|_| std::env::var("DATABASE_URL"))
221 .map_err(|_| {
222 anyhow::anyhow!(
223 "Postgres backend selected but neither SMOOTH_AGENT_DATABASE_URL \
224 nor DATABASE_URL is set"
225 )
226 })?;
227 let adapter = Arc::new(
228 PostgresAdapter::connect_with_embedder(&conn_str, embedder)
229 .await
230 .map_err(|e| anyhow::anyhow!("connecting Postgres storage backend: {e}"))?,
231 );
232 // Admin stores against the SAME database — durable.
233 let connectors = Arc::new(adapter.connector_config_store());
234 let settings = Arc::new(adapter.settings_store());
235 let indexing = Arc::new(adapter.indexing_store());
236 let storage: Arc<dyn StorageAdapter> = adapter;
237 AppState::new(storage, config)
238 .with_connector_configs(connectors)
239 .with_settings(settings)
240 .with_indexing(indexing)
241 }
242
243 // The DynamoDB storage backend is only compiled in on a build with the
244 // `dynamodb` feature (the default / cloud build). A lean build returns a
245 // clear error if `SMOOTH_AGENT_STORAGE=dynamodb` is requested at runtime.
246 #[cfg(feature = "dynamodb")]
247 StorageBackend::Dynamodb => {
248 use smooth_operator_adapter_dynamodb::DynamoDbAdapter;
249 let adapter = Arc::new(
250 DynamoDbAdapter::from_env(None)
251 .await
252 .map_err(|e| anyhow::anyhow!("connecting DynamoDB storage backend: {e}"))?,
253 );
254 adapter
255 .create_table()
256 .await
257 .map_err(|e| anyhow::anyhow!("creating DynamoDB table: {e}"))?;
258 // Admin stores against the SAME table — durable.
259 let connectors = Arc::new(adapter.connector_config_store());
260 let settings = Arc::new(adapter.settings_store());
261 let indexing = Arc::new(adapter.indexing_store());
262 let storage: Arc<dyn StorageAdapter> = adapter;
263 AppState::new(storage, config)
264 .with_connector_configs(connectors)
265 .with_settings(settings)
266 .with_indexing(indexing)
267 }
268
269 // Lean build: a persistent backend was requested but its feature wasn't
270 // compiled in. Fail loud with an actionable message rather than silently
271 // running in-memory (which would lose data on restart).
272 #[cfg(not(feature = "postgres"))]
273 StorageBackend::Postgres => {
274 anyhow::bail!(
275 "SMOOTH_AGENT_STORAGE=postgres requires building with --features postgres \
276 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
277 with the 'cloud'/'postgres' feature"
278 );
279 }
280 #[cfg(not(feature = "dynamodb"))]
281 StorageBackend::Dynamodb => {
282 anyhow::bail!(
283 "SMOOTH_AGENT_STORAGE=dynamodb requires building with --features dynamodb \
284 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
285 with the 'cloud'/'dynamodb' feature"
286 );
287 }
288 };
289
290 let state = install_backplane_from_env(state).await?;
291 let state = install_widget_auth_from_env(state);
292
293 Ok(state.with_auth(Arc::from(verifier)))
294}
295
296/// Select the connection [`Backplane`](smooth_operator::backplane::Backplane)
297/// from `SMOOTH_AGENT_BACKPLANE`, installing it via
298/// [`AppState::with_backplane`](crate::state::AppState::with_backplane).
299///
300/// | value | backend | url env |
301/// |---|---|---|
302/// | unset / `memory` / `inmemory` | single-process (default) | — |
303/// | `redis` / `valkey` | [`RedisBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_REDIS_URL` |
304/// | `nats` | [`NatsBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_NATS_URL` |
305///
306/// A distributed backend is required for >1 replica (otherwise an event produced
307/// on one pod can't reach a socket on another) and to let non-AI publishers push
308/// realtime events via `Backplane::publish`.
309///
310/// # Errors
311/// Returns an error for an unknown backend value, a missing url, or a failed
312/// connection — fail loud at boot rather than silently run single-process.
313async fn install_backplane_from_env(state: AppState) -> Result<AppState> {
314 let kind = std::env::var("SMOOTH_AGENT_BACKPLANE")
315 .unwrap_or_default()
316 .trim()
317 .to_lowercase();
318
319 let url = |specific: &str| -> Result<String> {
320 std::env::var("SMOOTH_AGENT_BACKPLANE_URL")
321 .or_else(|_| std::env::var(specific))
322 .map_err(|_| {
323 anyhow::anyhow!(
324 "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set"
325 )
326 })
327 };
328
329 match kind.as_str() {
330 "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed
331 // The Redis backplane is only compiled in on a build with the `redis`
332 // feature (the default / cloud build). A lean `--no-default-features`
333 // build returns a clear error rather than silently running single-process.
334 "redis" | "valkey" => {
335 #[cfg(feature = "redis")]
336 {
337 use smooth_operator_adapter_backplane_redis::RedisBackplane;
338 let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?)
339 .await
340 .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?;
341 Ok(state.with_backplane(Arc::new(backplane)))
342 }
343 #[cfg(not(feature = "redis"))]
344 {
345 let _ = url; // silence unused-closure warning on the lean build
346 anyhow::bail!(
347 "SMOOTH_AGENT_BACKPLANE={kind} requires building with --features redis \
348 (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
349 with the 'cloud'/'redis' feature"
350 )
351 }
352 }
353 // The NATS backplane is only compiled in on a build with the `nats`
354 // feature (the default / cloud build). A lean build returns a clear error.
355 "nats" => {
356 #[cfg(feature = "nats")]
357 {
358 use smooth_operator_adapter_backplane_nats::NatsBackplane;
359 let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?)
360 .await
361 .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?;
362 Ok(state.with_backplane(Arc::new(backplane)))
363 }
364 #[cfg(not(feature = "nats"))]
365 {
366 let _ = url; // silence unused-closure warning on the lean build
367 anyhow::bail!(
368 "SMOOTH_AGENT_BACKPLANE=nats requires building with --features nats \
369 (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
370 with the 'cloud'/'nats' feature"
371 )
372 }
373 }
374 other => Err(anyhow::anyhow!(
375 "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)"
376 )),
377 }
378}
379
380/// Seed a couple of distinctive demo docs so knowledge-grounded E2E is
381/// deterministic. The 17-day return window is deliberately unusual so an
382/// ungrounded answer can't accidentally match it. Both docs are tagged into the
383/// `policies` document set so the admin API can report it.
384pub fn seed_knowledge(storage: &InMemoryStorageAdapter) {
385 let kb = smooth_operator::adapter::StorageAdapter::knowledge(storage);
386 let _ = kb.ingest(smooth_operator::with_document_set(
387 Document::new(
388 "SmooAI's return window is exactly 17 days from delivery. Returns after 17 days are not accepted.",
389 "policies/returns.md",
390 DocumentType::Documentation,
391 ),
392 [SEED_DOCUMENT_SET],
393 ));
394 let _ = kb.ingest(smooth_operator::with_document_set(
395 Document::new(
396 "SmooAI standard shipping takes 5 to 7 business days. Expedited shipping takes 2 business days.",
397 "policies/shipping.md",
398 DocumentType::Documentation,
399 ),
400 [SEED_DOCUMENT_SET],
401 ));
402}
403
404/// Bind on `<SMOOTH_AGENT_BIND>:<port>` (default loopback) and serve until the
405/// process is killed. Returns the bound [`TcpListener`] + the router, used by
406/// both the binary and tests (tests bind port 0 for an ephemeral port).
407///
408/// Uses the **env-configured, secure-by-default** auth verifier
409/// ([`build_state_from_env`]) — the binary refuses to start if auth is
410/// misconfigured rather than silently serving the admin API unauthenticated.
411///
412/// # Errors
413/// Returns an error if the auth configuration is invalid or the TCP bind fails.
414pub async fn bind(config: ServerConfig) -> Result<(TcpListener, Router, CancellationToken)> {
415 let ip: std::net::IpAddr = config
416 .bind
417 .parse()
418 .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
419 let addr = SocketAddr::new(ip, config.port);
420 // Async so a Postgres / DynamoDB storage backend (and its matching durable
421 // admin stores) can be wired; in-memory stays synchronous inside.
422 let state = build_state_from_env_async(config).await?;
423 // Clone the shutdown token BEFORE the state is consumed into the router, so
424 // `run` can cancel it (which fans out to every per-connection clone) when a
425 // SIGTERM/ctrl_c arrives.
426 let shutdown = state.shutdown.clone();
427 let app = router(state);
428 let listener = TcpListener::bind(addr)
429 .await
430 .with_context(|| format!("binding WebSocket server on {addr}"))?;
431 Ok((listener, app, shutdown))
432}
433
434/// Serve a **pre-built** [`AppState`] to completion (blocks), binding on
435/// `state.config.bind:state.config.port`.
436///
437/// This is the library entry point for callers that assemble their own
438/// `AppState` — e.g. the `dev-support` example, which ingests a GitHub repo into
439/// a storage adapter, wires the env-configured [`AuthVerifier`], and then serves
440/// that exact state so the chat-widget queries the ingested knowledge. It does
441/// **not** rebuild the state or touch the ACL/auth/embedder/reranker selection —
442/// those are baked into the `state` the caller passes in. The WS loop, router,
443/// and listening log are identical to [`run`] (which builds its state from env);
444/// `run` is unchanged.
445///
446/// # Errors
447/// Returns an error if the TCP bind fails or serving fails.
448pub async fn serve_state(state: AppState) -> Result<()> {
449 let ip: std::net::IpAddr = state
450 .config
451 .bind
452 .parse()
453 .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
454 let addr = SocketAddr::new(ip, state.config.port);
455 let listener = TcpListener::bind(addr)
456 .await
457 .with_context(|| format!("binding WebSocket server on {addr}"))?;
458 serve_state_on(state, listener).await
459}
460
461/// Serve a pre-built [`AppState`] on an already-bound [`TcpListener`] (blocks).
462///
463/// Splitting the bind from the serve lets a caller (or a test) bind an ephemeral
464/// port, read [`TcpListener::local_addr`] for the real port, then hand the
465/// listener here. Logs the same "listening" line [`run`] does.
466///
467/// # Errors
468/// Returns an error if serving fails.
469pub async fn serve_state_on(state: AppState, listener: TcpListener) -> Result<()> {
470 let has_llm = state.config.has_llm();
471 let model = state.config.model.clone();
472 let gateway = state.config.gateway_url.clone();
473 let local = listener.local_addr().context("local addr")?;
474 let app = router(state);
475
476 tracing::info!(
477 %local,
478 endpoint = "/ws",
479 %model,
480 %gateway,
481 llm_enabled = has_llm,
482 "smooth-operator-server listening"
483 );
484 println!(
485 "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
486 );
487
488 axum::serve(listener, app)
489 .await
490 .context("serving WebSocket connections")?;
491 Ok(())
492}
493
494/// Run the server to completion (blocks). Logs a single listening line.
495///
496/// # Errors
497/// Returns an error if binding or serving fails.
498pub async fn run(config: ServerConfig) -> Result<()> {
499 let has_llm = config.has_llm();
500 let model = config.model.clone();
501 let gateway = config.gateway_url.clone();
502 let (listener, app, shutdown) = bind(config).await?;
503 let local = listener.local_addr().context("local addr")?;
504
505 tracing::info!(
506 %local,
507 endpoint = "/ws",
508 %model,
509 %gateway,
510 llm_enabled = has_llm,
511 "smooth-operator-server listening"
512 );
513 // Also print to stdout so the run-confirmation check is unambiguous without
514 // a tracing subscriber filter.
515 println!(
516 "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
517 );
518
519 // Graceful drain: stop accepting new connections AND cancel the shared
520 // shutdown token on SIGTERM (k8s pod termination) / ctrl_c. Cancelling fans
521 // out to every per-connection reader loop so each finishes its in-flight turn
522 // and detaches from the backplane before the process exits — within the
523 // chart's `terminationGracePeriodSeconds` window.
524 axum::serve(listener, app)
525 .with_graceful_shutdown(async move {
526 wait_for_shutdown_signal().await;
527 tracing::info!("shutdown signal received; draining in-flight WebSocket turns");
528 shutdown.cancel();
529 })
530 .await
531 .context("serving WebSocket connections")?;
532 Ok(())
533}
534
535/// Resolve when the process receives a termination request: SIGTERM (how
536/// Kubernetes asks a pod to stop on scale-down / rollout) **or** ctrl_c
537/// (SIGINT — interactive `cargo run`), whichever comes first.
538///
539/// Unix-only signal handling (the server targets Linux/k8s); on a non-unix host
540/// it falls back to ctrl_c alone so the binary still stops cleanly.
541async fn wait_for_shutdown_signal() {
542 #[cfg(unix)]
543 {
544 use tokio::signal::unix::{signal, SignalKind};
545 // If installing the SIGTERM handler somehow fails, fall back to ctrl_c
546 // only rather than panicking the serve task.
547 let mut sigterm = match signal(SignalKind::terminate()) {
548 Ok(s) => s,
549 Err(e) => {
550 tracing::warn!(error = %e, "failed to install SIGTERM handler; ctrl_c only");
551 let _ = tokio::signal::ctrl_c().await;
552 return;
553 }
554 };
555 tokio::select! {
556 _ = sigterm.recv() => {}
557 _ = tokio::signal::ctrl_c() => {}
558 }
559 }
560 #[cfg(not(unix))]
561 {
562 let _ = tokio::signal::ctrl_c().await;
563 }
564}
565
566/// Query parameters accepted on the `/ws` upgrade. `token` carries the bearer
567/// JWT used to authenticate the connection (browsers can't set custom headers on
568/// a WebSocket handshake, so the token rides on the query string — the standard
569/// pattern for WS auth).
570#[derive(Debug, serde::Deserialize, Default)]
571struct WsQuery {
572 /// The bearer token (raw JWT, no `Bearer ` prefix), if provided.
573 #[serde(default)]
574 token: Option<String>,
575}
576
577/// Resolve the connection's [`AccessContext`] from the `?token=` query param.
578///
579/// **Fail closed for ACL'd content**: when no token is presented, or the auth
580/// verifier is the no-key [`AdminDisabledVerifier`] (admin/auth unconfigured —
581/// dev/no-auth), or the token fails to verify, the connection runs as
582/// [`AccessContext::anonymous`] — which sees **only org-public** knowledge, not
583/// every document. A valid token yields the principal's full
584/// [`AccessContext`](smooth_operator::auth::Principal::access_context) (user id +
585/// groups), so it can read documents scoped to it. Verification failures are
586/// logged (never the token) and degrade to anonymous rather than dropping the
587/// connection, so the dev/no-auth case still serves org-public knowledge.
588/// The connection's resolved auth identity: its document-level
589/// [`AccessContext`] (used to filter retrieval) plus, when the token verified to
590/// a JWT principal, that principal's `org_id` (used to create sessions under the
591/// authenticated org). `org_id` is `None` for the anonymous/dev/no-auth case, so
592/// create-session falls through to the agent's widget-policy org, then the seed
593/// org — leaving the no-auth flavor unchanged.
594struct ConnectionAuth {
595 access: AccessContext,
596 org_id: Option<String>,
597}
598
599fn resolve_ws_access(state: &AppState, query: &WsQuery) -> ConnectionAuth {
600 let Some(token) = query
601 .token
602 .as_deref()
603 .map(str::trim)
604 .filter(|t| !t.is_empty())
605 else {
606 // No token → anonymous (org-public only). Keeps the dev/no-auth `/ws`
607 // path working while failing closed for ACL'd content.
608 return ConnectionAuth {
609 access: AccessContext::anonymous(),
610 org_id: None,
611 };
612 };
613 match state.auth.verify(token) {
614 Ok(principal) => ConnectionAuth {
615 access: principal.access_context(),
616 org_id: Some(principal.org_id),
617 },
618 Err(e) => {
619 // Don't leak the token; log only the mode + a generic reason.
620 tracing::warn!(
621 auth_mode = state.auth.mode(),
622 error = %e,
623 "ws token failed verification; serving org-public knowledge only (anonymous)"
624 );
625 ConnectionAuth {
626 access: AccessContext::anonymous(),
627 org_id: None,
628 }
629 }
630 }
631}
632
633/// Axum handler: upgrade an HTTP request on `/ws` to a WebSocket. The bearer
634/// token (if any) is taken from the `?token=` query param, resolved to an
635/// [`AccessContext`] at connect time, and threaded into every turn so retrieval
636/// is access-controlled per connection.
637async fn ws_upgrade(
638 ws: WebSocketUpgrade,
639 State(state): State<AppState>,
640 Query(query): Query<WsQuery>,
641 headers: axum::http::HeaderMap,
642) -> Response {
643 let ConnectionAuth { access, org_id } = resolve_ws_access(&state, &query);
644 // Capture the browser's `Origin` at the handshake (browsers always send it,
645 // and can't be made to forge another site's). It's enforced per-agent at
646 // session creation against the agent's embed allowlist (widget_auth).
647 let origin = headers
648 .get(axum::http::header::ORIGIN)
649 .and_then(|v| v.to_str().ok())
650 .map(str::to_string);
651 ws.on_upgrade(move |socket| connection_loop(socket, state, access, org_id, origin))
652}
653
654/// Drive one WebSocket connection: split into reader + writer, joined by an
655/// outbound event sink. `access` is the connection's resolved document-level
656/// entitlement, threaded into every `send_message` turn. `auth_org` is the
657/// authenticated JWT principal's org (when the token verified), used to create
658/// sessions under the authenticated org. `origin` is the handshake `Origin`
659/// header, enforced against an agent's embed allowlist.
660async fn connection_loop(
661 socket: WebSocket,
662 state: AppState,
663 access: AccessContext,
664 auth_org: Option<String>,
665 origin: Option<String>,
666) {
667 let (mut ws_tx, mut ws_rx) = socket.split();
668 let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
669
670 // Register this connection's outbound sink with the backplane so events
671 // published from anywhere (this pod or, with a Redis/NATS impl, another) can
672 // reach it. `conn_id` is associated with its session at create-session time.
673 let conn_id = uuid::Uuid::new_v4().to_string();
674 let sink_for_backplane = sink_tx.clone();
675 state
676 .backplane
677 .attach(
678 &conn_id,
679 std::sync::Arc::new(move |event| {
680 let _ = sink_for_backplane.send(event);
681 }),
682 )
683 .await;
684
685 // Writer: drain the sink and write each event as a JSON text frame.
686 let writer = tokio::spawn(async move {
687 while let Some(event) = sink_rx.recv().await {
688 let text = match serde_json::to_string(&event) {
689 Ok(t) => t,
690 Err(_) => continue,
691 };
692 if ws_tx.send(Message::Text(text.into())).await.is_err() {
693 break;
694 }
695 }
696 });
697
698 // Reader: dispatch inbound frames. Handlers emit events via `sink_tx`.
699 //
700 // The `select!` lets a graceful shutdown (SIGTERM/ctrl_c → `state.shutdown`
701 // cancelled by the serve loop) break this loop so the connection drains: it
702 // stops reading new frames, falls out, and detaches below. `biased` so the
703 // shutdown branch wins a tie. Crucially, `handle_frame(...).await` stays
704 // INSIDE the frame arm (not a `select!` condition), so a turn already in
705 // flight when the cancel fires runs to completion before the next loop
706 // iteration observes the cancellation — that is the in-flight drain.
707 loop {
708 tokio::select! {
709 biased;
710
711 () = state.shutdown.cancelled() => {
712 // Pod is terminating: stop accepting frames on this connection.
713 // Returning closes the socket (the writer task ends when
714 // `sink_tx` drops below); any turn that was mid-flight already
715 // finished in the frame arm before we got here.
716 break;
717 }
718
719 frame = ws_rx.next() => {
720 match frame {
721 Some(Ok(Message::Text(text))) => {
722 handler::handle_frame(
723 &state,
724 &access,
725 &conn_id,
726 origin.as_deref(),
727 auth_org.as_deref(),
728 text.as_str(),
729 &sink_tx,
730 )
731 .await;
732 }
733 Some(Ok(Message::Binary(_))) => {
734 let _ = sink_tx.send(crate::protocol::error(
735 None,
736 "VALIDATION_ERROR",
737 "binary frames are not supported; send JSON text frames",
738 ));
739 }
740 Some(Ok(Message::Close(_))) => break,
741 // Ping/Pong control frames are handled by axum automatically.
742 Some(Ok(_)) => {}
743 Some(Err(_)) => break,
744 // Stream ended (peer hung up).
745 None => break,
746 }
747 }
748 }
749 }
750
751 // Reader finished (peer closed, error, or graceful shutdown) → detach from
752 // the backplane so no stale registry entry is left behind, then drop the
753 // sink so the writer task exits.
754 state.backplane.detach(&conn_id).await;
755 drop(sink_tx);
756 let _ = writer.await;
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762 use smooth_operator::adapter::StorageAdapter;
763
764 #[test]
765 fn seeded_kb_returns_17_day_fact() {
766 let storage = InMemoryStorageAdapter::new();
767 seed_knowledge(&storage);
768 let results = storage
769 .knowledge()
770 .query("return window policy", 3)
771 .expect("query");
772 assert!(
773 results.iter().any(|r| r.chunk.contains("17")),
774 "expected seeded 17-day fact, got: {results:?}"
775 );
776 }
777
778 #[tokio::test]
779 async fn build_state_without_key_has_no_llm() {
780 let cfg = ServerConfig {
781 bind: "127.0.0.1".into(),
782 port: 0,
783 gateway_url: "https://example.test/v1".into(),
784 gateway_key: None,
785 model: "m".into(),
786 seed_kb: true,
787 max_iterations: 4,
788 max_tokens: 128,
789 storage: crate::config::StorageBackend::Memory,
790 widget_auth_strict: false,
791 confirm_tools: Vec::new(),
792 };
793 let state = build_state(cfg);
794 assert!(!state.config.has_llm());
795 }
796}