Skip to main content

smooth_operator_server/
server.rs

1//! The axum WebSocket server: one `/ws` endpoint, one task per connection.
2//!
3//! Per connection we split the socket and run two tasks joined by an
4//! `UnboundedSender<serde_json::Value>` outbound sink:
5//!
6//! - a **writer** that drains the sink and writes each event as a JSON text
7//!   frame, and
8//! - a **reader** that reads inbound frames and dispatches them via
9//!   [`crate::handler::handle_frame`], passing the sink so handlers (including
10//!   the streaming `send_message`) can emit events as they happen.
11//!
12//! Using a sink channel (instead of writing directly from the handler) is what
13//! lets a streaming turn fire many `stream_token` events from inside the agent
14//! loop while the connection is still reading.
15
16use std::net::SocketAddr;
17use std::sync::Arc;
18
19use anyhow::{Context, Result};
20use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
21use axum::extract::{Query, State};
22use axum::response::{Html, IntoResponse, Response};
23use axum::routing::get;
24use axum::Router;
25
26use futures_util::{SinkExt, StreamExt};
27use smooth_operator::access_control::AccessContext;
28use tokio::net::TcpListener;
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_adapter_memory::InMemoryStorageAdapter;
32use smooth_operator_core::{Document, DocumentType};
33
34use crate::config::ServerConfig;
35use crate::handler;
36use crate::state::AppState;
37
38/// Build the axum [`Router`] for the given application state. Exposed so tests
39/// can boot the server in-process. Serves the WebSocket `/ws` endpoint plus the
40/// auth-gated admin HTTP API under `/admin` (see [`crate::admin`]).
41pub fn router(state: AppState) -> Router {
42    let mut router = Router::new()
43        .route("/ws", get(ws_upgrade))
44        // Unauthenticated liveness/readiness probe. A WebSocket `/ws` upgrade is
45        // not a plain GET, so HTTP load balancers (AWS ALB / nginx ingress) need a
46        // real HTTP route that answers 200 to confirm the listener is up. Cheap
47        // and dependency-free — it does not touch storage/LLM, so it stays Ready
48        // even when an optional backend (gateway key, DB) is degraded.
49        .route("/health", get(health))
50        .merge(crate::admin::router());
51    // The local deployment flavor opts into serving the official widget; other
52    // flavors never mount these routes.
53    if state.serve_widget {
54        router = router
55            .route("/", get(widget_index))
56            .route("/chat-widget.iife.js", get(widget_bundle));
57    }
58    router.with_state(state)
59}
60
61/// Serve the local-flavor widget host page at `/`, injecting the auth token
62/// (same-origin, loopback) so the embedded `<smooth-agent-chat>` connects to
63/// this server's own `/ws?token=…`. The token is JSON-encoded into the page so
64/// quoting is always safe.
65async fn widget_index(State(state): State<AppState>) -> Html<String> {
66    let token_json = serde_json::to_string(state.widget_token.as_deref().unwrap_or(""))
67        .unwrap_or_else(|_| "\"\"".to_string());
68    let html =
69        include_str!("../assets/widget-index.html").replace("__SMOOTH_LOCAL_TOKEN__", &token_json);
70    Html(html)
71}
72
73/// Serve the vendored official widget IIFE bundle.
74async fn widget_bundle() -> impl IntoResponse {
75    (
76        [(
77            axum::http::header::CONTENT_TYPE,
78            "application/javascript; charset=utf-8",
79        )],
80        include_str!("../assets/chat-widget.iife.js"),
81    )
82}
83
84/// `GET /health` → `200 OK`. The minimal HTTP health endpoint for container
85/// orchestrators and HTTP load balancers (the WS `/ws` route can't serve a plain
86/// GET healthcheck).
87async fn health() -> &'static str {
88    "ok"
89}
90
91/// The document set the seeded demo docs are tagged into, so
92/// `GET /admin/document-sets` has something to report in a seeded server.
93const SEED_DOCUMENT_SET: &str = "policies";
94
95/// The org the seeded demo docs + their document-set registry entries belong to.
96/// Mirrors the org `handler::handle_create_session` stamps onto reference
97/// conversations, so the seeded sets show up for the reference org's admin
98/// caller (and ONLY that org — cross-org scoping).
99pub const SEED_ORG_ID: &str = "reference-org";
100
101/// Build an [`AppState`] over a fresh in-memory adapter, seeding the knowledge
102/// base when `config.seed_kb` is set. Exposed for tests + the binary.
103///
104/// The auth verifier defaults to [`NoAuthVerifier`](smooth_operator::auth::NoAuthVerifier)
105/// here (the protocol-only path needs no auth); the **binary** path
106/// ([`build_state_from_env`]) installs the env-configured, secure-by-default
107/// verifier instead.
108#[must_use]
109pub fn build_state(config: ServerConfig) -> AppState {
110    let seed = config.seed_kb;
111    let storage = Arc::new(InMemoryStorageAdapter::new());
112    let state = AppState::new(storage.clone(), config);
113    if seed {
114        seed_knowledge(storage.as_ref());
115        // Record the seeded docs' document-set membership for the admin API
116        // (the in-memory backend drops document metadata, so the registry is the
117        // source of truth for set names + counts).
118        state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
119        state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
120    }
121    state
122}
123
124/// Build an [`AppState`] with the **env-configured** auth verifier (secure by
125/// default — see [`smooth_operator::auth::AuthConfig`]). Used by the binary.
126///
127/// # Errors
128/// Returns an error if the auth configuration is invalid (e.g. `AUTH_MODE=jwt`
129/// with no key) — the server refuses to start rather than fall back to no-auth.
130pub fn build_state_from_env(config: ServerConfig) -> Result<AppState> {
131    let verifier = smooth_operator::auth::AuthConfig::from_env()
132        .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
133    let state = install_widget_auth_from_env(build_state(config));
134    Ok(state.with_auth(Arc::from(verifier)))
135}
136
137/// Install an [`HttpWidgetAuth`](smooth_operator::widget_auth::HttpWidgetAuth)
138/// provider from `WIDGET_AUTH_URL` (optionally `WIDGET_AUTH_BEARER` +
139/// `WIDGET_AUTH_TTL_SECS`); otherwise leave the permissive default. This lets a
140/// host enforce embeddable-widget auth against its own policy service by setting
141/// env vars — no custom binary needed. (A host wanting bespoke logic still
142/// installs its own provider via [`AppState::with_widget_auth`].)
143fn install_widget_auth_from_env(state: AppState) -> AppState {
144    let Ok(url) = std::env::var("WIDGET_AUTH_URL") else {
145        return state;
146    };
147    let url = url.trim();
148    if url.is_empty() {
149        return state;
150    }
151    let mut provider = smooth_operator::widget_auth::HttpWidgetAuth::new(url);
152    if let Ok(bearer) = std::env::var("WIDGET_AUTH_BEARER") {
153        let bearer = bearer.trim();
154        if !bearer.is_empty() {
155            provider = provider.with_bearer(bearer);
156        }
157    }
158    if let Some(secs) = std::env::var("WIDGET_AUTH_TTL_SECS")
159        .ok()
160        .and_then(|s| s.trim().parse::<u64>().ok())
161    {
162        provider = provider.with_ttl(std::time::Duration::from_secs(secs));
163    }
164    state.with_widget_auth(Arc::new(provider))
165}
166
167/// Build an [`AppState`] selecting the **storage backend** (and the matching
168/// durable **admin stores**) from `config.storage`, then installing the
169/// env-configured auth verifier.
170///
171/// - [`StorageBackend::Memory`](crate::config::StorageBackend::Memory) — the
172///   in-memory adapter + in-memory admin stores (the [`build_state`] path; lost
173///   on restart). The default.
174/// - [`StorageBackend::Postgres`](crate::config::StorageBackend::Postgres) —
175///   the Postgres + pgvector adapter; the admin stores persist to the **same
176///   database** (`connector_configs` / `agent_settings` / `indexing_runs`).
177///   Connection string from `SMOOTH_AGENT_DATABASE_URL` / `DATABASE_URL`.
178/// - [`StorageBackend::Dynamodb`](crate::config::StorageBackend::Dynamodb) — the
179///   DynamoDB single-table adapter; the admin stores persist to the **same
180///   table**. Table from `SMOOTH_AGENT_DDB_TABLE`; the table is created if
181///   absent.
182///
183/// The admin store backend always matches the storage backend so a connector
184/// config / settings / indexing run survives a restart wherever the
185/// conversations and knowledge live.
186///
187/// # Errors
188/// Returns an error if the auth configuration is invalid, or if the selected
189/// persistent backend fails to connect / migrate.
190pub async fn build_state_from_env_async(config: ServerConfig) -> Result<AppState> {
191    use crate::config::StorageBackend;
192    // Only the Postgres / DynamoDB arms name `StorageAdapter` (for the
193    // `Arc<dyn StorageAdapter>` annotation); on a lean build with neither feature
194    // those arms are compiled out, so the import would be unused.
195    #[cfg(any(feature = "postgres", feature = "dynamodb"))]
196    use smooth_operator::adapter::StorageAdapter;
197
198    let verifier = smooth_operator::auth::AuthConfig::from_env()
199        .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
200
201    let state = match config.storage {
202        // The in-memory path is unchanged (synchronous, no external services).
203        StorageBackend::Memory => build_state(config),
204
205        // The Postgres storage backend (and its matching durable admin stores)
206        // is only compiled in on a build with the `postgres` feature (the default
207        // / cloud build). A lean `--no-default-features` build returns a clear
208        // error if `SMOOTH_AGENT_STORAGE=postgres` is requested at runtime.
209        #[cfg(feature = "postgres")]
210        StorageBackend::Postgres => {
211            use smooth_operator_adapter_postgres::PostgresAdapter;
212            // The pgvector column width MUST match the embedder the `/index`
213            // path uses (1536 keyed / 1024 offline). Build the embedder from
214            // config and create the adapter with it so document vectors (at
215            // ingest) and query vectors agree — no silent 1024/1536 mismatch.
216            let embedder = crate::embedder::build_embedder(
217                &crate::embedder::EmbedderConfig::from_server_config(&config),
218            );
219            let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
220                .or_else(|_| std::env::var("DATABASE_URL"))
221                .map_err(|_| {
222                    anyhow::anyhow!(
223                        "Postgres backend selected but neither SMOOTH_AGENT_DATABASE_URL \
224                             nor DATABASE_URL is set"
225                    )
226                })?;
227            let adapter = Arc::new(
228                PostgresAdapter::connect_with_embedder(&conn_str, embedder)
229                    .await
230                    .map_err(|e| anyhow::anyhow!("connecting Postgres storage backend: {e}"))?,
231            );
232            // Admin stores against the SAME database — durable.
233            let connectors = Arc::new(adapter.connector_config_store());
234            let settings = Arc::new(adapter.settings_store());
235            let indexing = Arc::new(adapter.indexing_store());
236            let storage: Arc<dyn StorageAdapter> = adapter;
237            AppState::new(storage, config)
238                .with_connector_configs(connectors)
239                .with_settings(settings)
240                .with_indexing(indexing)
241        }
242
243        // The DynamoDB storage backend is only compiled in on a build with the
244        // `dynamodb` feature (the default / cloud build). A lean build returns a
245        // clear error if `SMOOTH_AGENT_STORAGE=dynamodb` is requested at runtime.
246        #[cfg(feature = "dynamodb")]
247        StorageBackend::Dynamodb => {
248            use smooth_operator_adapter_dynamodb::DynamoDbAdapter;
249            let adapter = Arc::new(
250                DynamoDbAdapter::from_env(None)
251                    .await
252                    .map_err(|e| anyhow::anyhow!("connecting DynamoDB storage backend: {e}"))?,
253            );
254            adapter
255                .create_table()
256                .await
257                .map_err(|e| anyhow::anyhow!("creating DynamoDB table: {e}"))?;
258            // Admin stores against the SAME table — durable.
259            let connectors = Arc::new(adapter.connector_config_store());
260            let settings = Arc::new(adapter.settings_store());
261            let indexing = Arc::new(adapter.indexing_store());
262            let storage: Arc<dyn StorageAdapter> = adapter;
263            AppState::new(storage, config)
264                .with_connector_configs(connectors)
265                .with_settings(settings)
266                .with_indexing(indexing)
267        }
268
269        // Lean build: a persistent backend was requested but its feature wasn't
270        // compiled in. Fail loud with an actionable message rather than silently
271        // running in-memory (which would lose data on restart).
272        #[cfg(not(feature = "postgres"))]
273        StorageBackend::Postgres => {
274            anyhow::bail!(
275                "SMOOTH_AGENT_STORAGE=postgres requires building with --features postgres \
276                 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
277                 with the 'cloud'/'postgres' feature"
278            );
279        }
280        #[cfg(not(feature = "dynamodb"))]
281        StorageBackend::Dynamodb => {
282            anyhow::bail!(
283                "SMOOTH_AGENT_STORAGE=dynamodb requires building with --features dynamodb \
284                 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
285                 with the 'cloud'/'dynamodb' feature"
286            );
287        }
288    };
289
290    let state = install_backplane_from_env(state).await?;
291    let state = install_widget_auth_from_env(state);
292
293    Ok(state.with_auth(Arc::from(verifier)))
294}
295
296/// Select the connection [`Backplane`](smooth_operator::backplane::Backplane)
297/// from `SMOOTH_AGENT_BACKPLANE`, installing it via
298/// [`AppState::with_backplane`](crate::state::AppState::with_backplane).
299///
300/// | value | backend | url env |
301/// |---|---|---|
302/// | unset / `memory` / `inmemory` | single-process (default) | — |
303/// | `redis` / `valkey` | [`RedisBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_REDIS_URL` |
304/// | `nats` | [`NatsBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_NATS_URL` |
305///
306/// A distributed backend is required for >1 replica (otherwise an event produced
307/// on one pod can't reach a socket on another) and to let non-AI publishers push
308/// realtime events via `Backplane::publish`.
309///
310/// # Errors
311/// Returns an error for an unknown backend value, a missing url, or a failed
312/// connection — fail loud at boot rather than silently run single-process.
313async fn install_backplane_from_env(state: AppState) -> Result<AppState> {
314    let kind = std::env::var("SMOOTH_AGENT_BACKPLANE")
315        .unwrap_or_default()
316        .trim()
317        .to_lowercase();
318
319    let url = |specific: &str| -> Result<String> {
320        std::env::var("SMOOTH_AGENT_BACKPLANE_URL")
321            .or_else(|_| std::env::var(specific))
322            .map_err(|_| {
323                anyhow::anyhow!(
324                    "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set"
325                )
326            })
327    };
328
329    match kind.as_str() {
330        "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed
331        // The Redis backplane is only compiled in on a build with the `redis`
332        // feature (the default / cloud build). A lean `--no-default-features`
333        // build returns a clear error rather than silently running single-process.
334        "redis" | "valkey" => {
335            #[cfg(feature = "redis")]
336            {
337                use smooth_operator_adapter_backplane_redis::RedisBackplane;
338                let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?)
339                    .await
340                    .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?;
341                Ok(state.with_backplane(Arc::new(backplane)))
342            }
343            #[cfg(not(feature = "redis"))]
344            {
345                let _ = url; // silence unused-closure warning on the lean build
346                anyhow::bail!(
347                    "SMOOTH_AGENT_BACKPLANE={kind} requires building with --features redis \
348                     (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
349                     with the 'cloud'/'redis' feature"
350                )
351            }
352        }
353        // The NATS backplane is only compiled in on a build with the `nats`
354        // feature (the default / cloud build). A lean build returns a clear error.
355        "nats" => {
356            #[cfg(feature = "nats")]
357            {
358                use smooth_operator_adapter_backplane_nats::NatsBackplane;
359                let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?)
360                    .await
361                    .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?;
362                Ok(state.with_backplane(Arc::new(backplane)))
363            }
364            #[cfg(not(feature = "nats"))]
365            {
366                let _ = url; // silence unused-closure warning on the lean build
367                anyhow::bail!(
368                    "SMOOTH_AGENT_BACKPLANE=nats requires building with --features nats \
369                     (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
370                     with the 'cloud'/'nats' feature"
371                )
372            }
373        }
374        other => Err(anyhow::anyhow!(
375            "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)"
376        )),
377    }
378}
379
380/// Seed a couple of distinctive demo docs so knowledge-grounded E2E is
381/// deterministic. The 17-day return window is deliberately unusual so an
382/// ungrounded answer can't accidentally match it. Both docs are tagged into the
383/// `policies` document set so the admin API can report it.
384pub fn seed_knowledge(storage: &InMemoryStorageAdapter) {
385    let kb = smooth_operator::adapter::StorageAdapter::knowledge(storage);
386    let _ = kb.ingest(smooth_operator::with_document_set(
387        Document::new(
388            "SmooAI's return window is exactly 17 days from delivery. Returns after 17 days are not accepted.",
389            "policies/returns.md",
390            DocumentType::Documentation,
391        ),
392        [SEED_DOCUMENT_SET],
393    ));
394    let _ = kb.ingest(smooth_operator::with_document_set(
395        Document::new(
396            "SmooAI standard shipping takes 5 to 7 business days. Expedited shipping takes 2 business days.",
397            "policies/shipping.md",
398            DocumentType::Documentation,
399        ),
400        [SEED_DOCUMENT_SET],
401    ));
402}
403
404/// Bind on `<SMOOTH_AGENT_BIND>:<port>` (default loopback) and serve until the
405/// process is killed. Returns the bound [`TcpListener`] + the router, used by
406/// both the binary and tests (tests bind port 0 for an ephemeral port).
407///
408/// Uses the **env-configured, secure-by-default** auth verifier
409/// ([`build_state_from_env`]) — the binary refuses to start if auth is
410/// misconfigured rather than silently serving the admin API unauthenticated.
411///
412/// # Errors
413/// Returns an error if the auth configuration is invalid or the TCP bind fails.
414pub async fn bind(config: ServerConfig) -> Result<(TcpListener, Router, CancellationToken)> {
415    let ip: std::net::IpAddr = config
416        .bind
417        .parse()
418        .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
419    let addr = SocketAddr::new(ip, config.port);
420    // Async so a Postgres / DynamoDB storage backend (and its matching durable
421    // admin stores) can be wired; in-memory stays synchronous inside.
422    let state = build_state_from_env_async(config).await?;
423    // Clone the shutdown token BEFORE the state is consumed into the router, so
424    // `run` can cancel it (which fans out to every per-connection clone) when a
425    // SIGTERM/ctrl_c arrives.
426    let shutdown = state.shutdown.clone();
427    let app = router(state);
428    let listener = TcpListener::bind(addr)
429        .await
430        .with_context(|| format!("binding WebSocket server on {addr}"))?;
431    Ok((listener, app, shutdown))
432}
433
434/// Serve a **pre-built** [`AppState`] to completion (blocks), binding on
435/// `state.config.bind:state.config.port`.
436///
437/// This is the library entry point for callers that assemble their own
438/// `AppState` — e.g. the `dev-support` example, which ingests a GitHub repo into
439/// a storage adapter, wires the env-configured [`AuthVerifier`], and then serves
440/// that exact state so the chat-widget queries the ingested knowledge. It does
441/// **not** rebuild the state or touch the ACL/auth/embedder/reranker selection —
442/// those are baked into the `state` the caller passes in. The WS loop, router,
443/// and listening log are identical to [`run`] (which builds its state from env);
444/// `run` is unchanged.
445///
446/// # Errors
447/// Returns an error if the TCP bind fails or serving fails.
448pub async fn serve_state(state: AppState) -> Result<()> {
449    let ip: std::net::IpAddr = state
450        .config
451        .bind
452        .parse()
453        .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
454    let addr = SocketAddr::new(ip, state.config.port);
455    let listener = TcpListener::bind(addr)
456        .await
457        .with_context(|| format!("binding WebSocket server on {addr}"))?;
458    serve_state_on(state, listener).await
459}
460
461/// Serve a pre-built [`AppState`] on an already-bound [`TcpListener`] (blocks).
462///
463/// Splitting the bind from the serve lets a caller (or a test) bind an ephemeral
464/// port, read [`TcpListener::local_addr`] for the real port, then hand the
465/// listener here. Logs the same "listening" line [`run`] does.
466///
467/// # Errors
468/// Returns an error if serving fails.
469pub async fn serve_state_on(state: AppState, listener: TcpListener) -> Result<()> {
470    let has_llm = state.config.has_llm();
471    let model = state.config.model.clone();
472    let gateway = state.config.gateway_url.clone();
473    let local = listener.local_addr().context("local addr")?;
474    let app = router(state);
475
476    tracing::info!(
477        %local,
478        endpoint = "/ws",
479        %model,
480        %gateway,
481        llm_enabled = has_llm,
482        "smooth-operator-server listening"
483    );
484    println!(
485        "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
486    );
487
488    axum::serve(listener, app)
489        .await
490        .context("serving WebSocket connections")?;
491    Ok(())
492}
493
494/// Run the server to completion (blocks). Logs a single listening line.
495///
496/// # Errors
497/// Returns an error if binding or serving fails.
498pub async fn run(config: ServerConfig) -> Result<()> {
499    let has_llm = config.has_llm();
500    let model = config.model.clone();
501    let gateway = config.gateway_url.clone();
502    let (listener, app, shutdown) = bind(config).await?;
503    let local = listener.local_addr().context("local addr")?;
504
505    tracing::info!(
506        %local,
507        endpoint = "/ws",
508        %model,
509        %gateway,
510        llm_enabled = has_llm,
511        "smooth-operator-server listening"
512    );
513    // Also print to stdout so the run-confirmation check is unambiguous without
514    // a tracing subscriber filter.
515    println!(
516        "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
517    );
518
519    // Graceful drain: stop accepting new connections AND cancel the shared
520    // shutdown token on SIGTERM (k8s pod termination) / ctrl_c. Cancelling fans
521    // out to every per-connection reader loop so each finishes its in-flight turn
522    // and detaches from the backplane before the process exits — within the
523    // chart's `terminationGracePeriodSeconds` window.
524    axum::serve(listener, app)
525        .with_graceful_shutdown(async move {
526            wait_for_shutdown_signal().await;
527            tracing::info!("shutdown signal received; draining in-flight WebSocket turns");
528            shutdown.cancel();
529        })
530        .await
531        .context("serving WebSocket connections")?;
532    Ok(())
533}
534
535/// Resolve when the process receives a termination request: SIGTERM (how
536/// Kubernetes asks a pod to stop on scale-down / rollout) **or** ctrl_c
537/// (SIGINT — interactive `cargo run`), whichever comes first.
538///
539/// Unix-only signal handling (the server targets Linux/k8s); on a non-unix host
540/// it falls back to ctrl_c alone so the binary still stops cleanly.
541async fn wait_for_shutdown_signal() {
542    #[cfg(unix)]
543    {
544        use tokio::signal::unix::{signal, SignalKind};
545        // If installing the SIGTERM handler somehow fails, fall back to ctrl_c
546        // only rather than panicking the serve task.
547        let mut sigterm = match signal(SignalKind::terminate()) {
548            Ok(s) => s,
549            Err(e) => {
550                tracing::warn!(error = %e, "failed to install SIGTERM handler; ctrl_c only");
551                let _ = tokio::signal::ctrl_c().await;
552                return;
553            }
554        };
555        tokio::select! {
556            _ = sigterm.recv() => {}
557            _ = tokio::signal::ctrl_c() => {}
558        }
559    }
560    #[cfg(not(unix))]
561    {
562        let _ = tokio::signal::ctrl_c().await;
563    }
564}
565
566/// Query parameters accepted on the `/ws` upgrade. `token` carries the bearer
567/// JWT used to authenticate the connection (browsers can't set custom headers on
568/// a WebSocket handshake, so the token rides on the query string — the standard
569/// pattern for WS auth).
570#[derive(Debug, serde::Deserialize, Default)]
571struct WsQuery {
572    /// The bearer token (raw JWT, no `Bearer ` prefix), if provided.
573    #[serde(default)]
574    token: Option<String>,
575}
576
577/// Resolve the connection's [`AccessContext`] from the `?token=` query param.
578///
579/// **Fail closed for ACL'd content**: when no token is presented, or the auth
580/// verifier is the no-key [`AdminDisabledVerifier`] (admin/auth unconfigured —
581/// dev/no-auth), or the token fails to verify, the connection runs as
582/// [`AccessContext::anonymous`] — which sees **only org-public** knowledge, not
583/// every document. A valid token yields the principal's full
584/// [`AccessContext`](smooth_operator::auth::Principal::access_context) (user id +
585/// groups), so it can read documents scoped to it. Verification failures are
586/// logged (never the token) and degrade to anonymous rather than dropping the
587/// connection, so the dev/no-auth case still serves org-public knowledge.
588/// The connection's resolved auth identity: its document-level
589/// [`AccessContext`] (used to filter retrieval) plus, when the token verified to
590/// a JWT principal, that principal's `org_id` (used to create sessions under the
591/// authenticated org). `org_id` is `None` for the anonymous/dev/no-auth case, so
592/// create-session falls through to the agent's widget-policy org, then the seed
593/// org — leaving the no-auth flavor unchanged.
594struct ConnectionAuth {
595    access: AccessContext,
596    org_id: Option<String>,
597}
598
599fn resolve_ws_access(state: &AppState, query: &WsQuery) -> ConnectionAuth {
600    let Some(token) = query
601        .token
602        .as_deref()
603        .map(str::trim)
604        .filter(|t| !t.is_empty())
605    else {
606        // No token → anonymous (org-public only). Keeps the dev/no-auth `/ws`
607        // path working while failing closed for ACL'd content.
608        return ConnectionAuth {
609            access: AccessContext::anonymous(),
610            org_id: None,
611        };
612    };
613    match state.auth.verify(token) {
614        Ok(principal) => ConnectionAuth {
615            access: principal.access_context(),
616            org_id: Some(principal.org_id),
617        },
618        Err(e) => {
619            // Don't leak the token; log only the mode + a generic reason.
620            tracing::warn!(
621                auth_mode = state.auth.mode(),
622                error = %e,
623                "ws token failed verification; serving org-public knowledge only (anonymous)"
624            );
625            ConnectionAuth {
626                access: AccessContext::anonymous(),
627                org_id: None,
628            }
629        }
630    }
631}
632
633/// Axum handler: upgrade an HTTP request on `/ws` to a WebSocket. The bearer
634/// token (if any) is taken from the `?token=` query param, resolved to an
635/// [`AccessContext`] at connect time, and threaded into every turn so retrieval
636/// is access-controlled per connection.
637async fn ws_upgrade(
638    ws: WebSocketUpgrade,
639    State(state): State<AppState>,
640    Query(query): Query<WsQuery>,
641    headers: axum::http::HeaderMap,
642) -> Response {
643    let ConnectionAuth { access, org_id } = resolve_ws_access(&state, &query);
644    // Capture the browser's `Origin` at the handshake (browsers always send it,
645    // and can't be made to forge another site's). It's enforced per-agent at
646    // session creation against the agent's embed allowlist (widget_auth).
647    let origin = headers
648        .get(axum::http::header::ORIGIN)
649        .and_then(|v| v.to_str().ok())
650        .map(str::to_string);
651    ws.on_upgrade(move |socket| connection_loop(socket, state, access, org_id, origin))
652}
653
654/// Drive one WebSocket connection: split into reader + writer, joined by an
655/// outbound event sink. `access` is the connection's resolved document-level
656/// entitlement, threaded into every `send_message` turn. `auth_org` is the
657/// authenticated JWT principal's org (when the token verified), used to create
658/// sessions under the authenticated org. `origin` is the handshake `Origin`
659/// header, enforced against an agent's embed allowlist.
660async fn connection_loop(
661    socket: WebSocket,
662    state: AppState,
663    access: AccessContext,
664    auth_org: Option<String>,
665    origin: Option<String>,
666) {
667    let (mut ws_tx, mut ws_rx) = socket.split();
668    let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
669
670    // Register this connection's outbound sink with the backplane so events
671    // published from anywhere (this pod or, with a Redis/NATS impl, another) can
672    // reach it. `conn_id` is associated with its session at create-session time.
673    let conn_id = uuid::Uuid::new_v4().to_string();
674    let sink_for_backplane = sink_tx.clone();
675    state
676        .backplane
677        .attach(
678            &conn_id,
679            std::sync::Arc::new(move |event| {
680                let _ = sink_for_backplane.send(event);
681            }),
682        )
683        .await;
684
685    // Writer: drain the sink and write each event as a JSON text frame.
686    let writer = tokio::spawn(async move {
687        while let Some(event) = sink_rx.recv().await {
688            let text = match serde_json::to_string(&event) {
689                Ok(t) => t,
690                Err(_) => continue,
691            };
692            if ws_tx.send(Message::Text(text.into())).await.is_err() {
693                break;
694            }
695        }
696    });
697
698    // Reader: dispatch inbound frames. Handlers emit events via `sink_tx`.
699    //
700    // The `select!` lets a graceful shutdown (SIGTERM/ctrl_c → `state.shutdown`
701    // cancelled by the serve loop) break this loop so the connection drains: it
702    // stops reading new frames, falls out, and detaches below. `biased` so the
703    // shutdown branch wins a tie. Crucially, `handle_frame(...).await` stays
704    // INSIDE the frame arm (not a `select!` condition), so a turn already in
705    // flight when the cancel fires runs to completion before the next loop
706    // iteration observes the cancellation — that is the in-flight drain.
707    loop {
708        tokio::select! {
709            biased;
710
711            () = state.shutdown.cancelled() => {
712                // Pod is terminating: stop accepting frames on this connection.
713                // Returning closes the socket (the writer task ends when
714                // `sink_tx` drops below); any turn that was mid-flight already
715                // finished in the frame arm before we got here.
716                break;
717            }
718
719            frame = ws_rx.next() => {
720                match frame {
721                    Some(Ok(Message::Text(text))) => {
722                        handler::handle_frame(
723                            &state,
724                            &access,
725                            &conn_id,
726                            origin.as_deref(),
727                            auth_org.as_deref(),
728                            text.as_str(),
729                            &sink_tx,
730                        )
731                        .await;
732                    }
733                    Some(Ok(Message::Binary(_))) => {
734                        let _ = sink_tx.send(crate::protocol::error(
735                            None,
736                            "VALIDATION_ERROR",
737                            "binary frames are not supported; send JSON text frames",
738                        ));
739                    }
740                    Some(Ok(Message::Close(_))) => break,
741                    // Ping/Pong control frames are handled by axum automatically.
742                    Some(Ok(_)) => {}
743                    Some(Err(_)) => break,
744                    // Stream ended (peer hung up).
745                    None => break,
746                }
747            }
748        }
749    }
750
751    // Reader finished (peer closed, error, or graceful shutdown) → detach from
752    // the backplane so no stale registry entry is left behind, then drop the
753    // sink so the writer task exits.
754    state.backplane.detach(&conn_id).await;
755    drop(sink_tx);
756    let _ = writer.await;
757}
758
759#[cfg(test)]
760mod tests {
761    use super::*;
762    use smooth_operator::adapter::StorageAdapter;
763
764    #[test]
765    fn seeded_kb_returns_17_day_fact() {
766        let storage = InMemoryStorageAdapter::new();
767        seed_knowledge(&storage);
768        let results = storage
769            .knowledge()
770            .query("return window policy", 3)
771            .expect("query");
772        assert!(
773            results.iter().any(|r| r.chunk.contains("17")),
774            "expected seeded 17-day fact, got: {results:?}"
775        );
776    }
777
778    #[tokio::test]
779    async fn build_state_without_key_has_no_llm() {
780        let cfg = ServerConfig {
781            bind: "127.0.0.1".into(),
782            port: 0,
783            gateway_url: "https://example.test/v1".into(),
784            gateway_key: None,
785            model: "m".into(),
786            seed_kb: true,
787            max_iterations: 4,
788            max_tokens: 128,
789            storage: crate::config::StorageBackend::Memory,
790            widget_auth_strict: false,
791            confirm_tools: Vec::new(),
792        };
793        let state = build_state(cfg);
794        assert!(!state.config.has_llm());
795    }
796}