Skip to main content

smooth_operator_server/
server.rs

1//! The axum WebSocket server: one `/ws` endpoint, one task per connection.
2//!
3//! Per connection we split the socket and run two tasks joined by an
4//! `UnboundedSender<serde_json::Value>` outbound sink:
5//!
6//! - a **writer** that drains the sink and writes each event as a JSON text
7//!   frame, and
8//! - a **reader** that reads inbound frames and dispatches them via
9//!   [`crate::handler::handle_frame`], passing the sink so handlers (including
10//!   the streaming `send_message`) can emit events as they happen.
11//!
12//! Using a sink channel (instead of writing directly from the handler) is what
13//! lets a streaming turn fire many `stream_token` events from inside the agent
14//! loop while the connection is still reading.
15
16use std::net::SocketAddr;
17use std::sync::Arc;
18
19use anyhow::{Context, Result};
20use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
21use axum::extract::{Query, State};
22use axum::response::Response;
23use axum::routing::get;
24use axum::Router;
25
26use futures_util::{SinkExt, StreamExt};
27use smooth_operator::access_control::AccessContext;
28use tokio::net::TcpListener;
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_adapter_memory::InMemoryStorageAdapter;
32use smooth_operator_core::{Document, DocumentType};
33
34use crate::config::ServerConfig;
35use crate::handler;
36use crate::state::AppState;
37
38/// Build the axum [`Router`] for the given application state. Exposed so tests
39/// can boot the server in-process. Serves the WebSocket `/ws` endpoint plus the
40/// auth-gated admin HTTP API under `/admin` (see [`crate::admin`]).
41pub fn router(state: AppState) -> Router {
42    Router::new()
43        .route("/ws", get(ws_upgrade))
44        // Unauthenticated liveness/readiness probe. A WebSocket `/ws` upgrade is
45        // not a plain GET, so HTTP load balancers (AWS ALB / nginx ingress) need a
46        // real HTTP route that answers 200 to confirm the listener is up. Cheap
47        // and dependency-free — it does not touch storage/LLM, so it stays Ready
48        // even when an optional backend (gateway key, DB) is degraded.
49        .route("/health", get(health))
50        .merge(crate::admin::router())
51        .with_state(state)
52}
53
54/// `GET /health` → `200 OK`. The minimal HTTP health endpoint for container
55/// orchestrators and HTTP load balancers (the WS `/ws` route can't serve a plain
56/// GET healthcheck).
57async fn health() -> &'static str {
58    "ok"
59}
60
61/// The document set the seeded demo docs are tagged into, so
62/// `GET /admin/document-sets` has something to report in a seeded server.
63const SEED_DOCUMENT_SET: &str = "policies";
64
65/// The org the seeded demo docs + their document-set registry entries belong to.
66/// Mirrors the org `handler::handle_create_session` stamps onto reference
67/// conversations, so the seeded sets show up for the reference org's admin
68/// caller (and ONLY that org — cross-org scoping).
69pub const SEED_ORG_ID: &str = "reference-org";
70
71/// Build an [`AppState`] over a fresh in-memory adapter, seeding the knowledge
72/// base when `config.seed_kb` is set. Exposed for tests + the binary.
73///
74/// The auth verifier defaults to [`NoAuthVerifier`](smooth_operator::auth::NoAuthVerifier)
75/// here (the protocol-only path needs no auth); the **binary** path
76/// ([`build_state_from_env`]) installs the env-configured, secure-by-default
77/// verifier instead.
78#[must_use]
79pub fn build_state(config: ServerConfig) -> AppState {
80    let seed = config.seed_kb;
81    let storage = Arc::new(InMemoryStorageAdapter::new());
82    let state = AppState::new(storage.clone(), config);
83    if seed {
84        seed_knowledge(storage.as_ref());
85        // Record the seeded docs' document-set membership for the admin API
86        // (the in-memory backend drops document metadata, so the registry is the
87        // source of truth for set names + counts).
88        state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
89        state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
90    }
91    state
92}
93
94/// Build an [`AppState`] with the **env-configured** auth verifier (secure by
95/// default — see [`smooth_operator::auth::AuthConfig`]). Used by the binary.
96///
97/// # Errors
98/// Returns an error if the auth configuration is invalid (e.g. `AUTH_MODE=jwt`
99/// with no key) — the server refuses to start rather than fall back to no-auth.
100pub fn build_state_from_env(config: ServerConfig) -> Result<AppState> {
101    let verifier = smooth_operator::auth::AuthConfig::from_env()
102        .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
103    let state = install_widget_auth_from_env(build_state(config));
104    Ok(state.with_auth(Arc::from(verifier)))
105}
106
107/// Install an [`HttpWidgetAuth`](smooth_operator::widget_auth::HttpWidgetAuth)
108/// provider from `WIDGET_AUTH_URL` (optionally `WIDGET_AUTH_BEARER` +
109/// `WIDGET_AUTH_TTL_SECS`); otherwise leave the permissive default. This lets a
110/// host enforce embeddable-widget auth against its own policy service by setting
111/// env vars — no custom binary needed. (A host wanting bespoke logic still
112/// installs its own provider via [`AppState::with_widget_auth`].)
113fn install_widget_auth_from_env(state: AppState) -> AppState {
114    let Ok(url) = std::env::var("WIDGET_AUTH_URL") else {
115        return state;
116    };
117    let url = url.trim();
118    if url.is_empty() {
119        return state;
120    }
121    let mut provider = smooth_operator::widget_auth::HttpWidgetAuth::new(url);
122    if let Ok(bearer) = std::env::var("WIDGET_AUTH_BEARER") {
123        let bearer = bearer.trim();
124        if !bearer.is_empty() {
125            provider = provider.with_bearer(bearer);
126        }
127    }
128    if let Some(secs) = std::env::var("WIDGET_AUTH_TTL_SECS")
129        .ok()
130        .and_then(|s| s.trim().parse::<u64>().ok())
131    {
132        provider = provider.with_ttl(std::time::Duration::from_secs(secs));
133    }
134    state.with_widget_auth(Arc::new(provider))
135}
136
137/// Build an [`AppState`] selecting the **storage backend** (and the matching
138/// durable **admin stores**) from `config.storage`, then installing the
139/// env-configured auth verifier.
140///
141/// - [`StorageBackend::Memory`](crate::config::StorageBackend::Memory) — the
142///   in-memory adapter + in-memory admin stores (the [`build_state`] path; lost
143///   on restart). The default.
144/// - [`StorageBackend::Postgres`](crate::config::StorageBackend::Postgres) —
145///   the Postgres + pgvector adapter; the admin stores persist to the **same
146///   database** (`connector_configs` / `agent_settings` / `indexing_runs`).
147///   Connection string from `SMOOTH_AGENT_DATABASE_URL` / `DATABASE_URL`.
148/// - [`StorageBackend::Dynamodb`](crate::config::StorageBackend::Dynamodb) — the
149///   DynamoDB single-table adapter; the admin stores persist to the **same
150///   table**. Table from `SMOOTH_AGENT_DDB_TABLE`; the table is created if
151///   absent.
152///
153/// The admin store backend always matches the storage backend so a connector
154/// config / settings / indexing run survives a restart wherever the
155/// conversations and knowledge live.
156///
157/// # Errors
158/// Returns an error if the auth configuration is invalid, or if the selected
159/// persistent backend fails to connect / migrate.
160pub async fn build_state_from_env_async(config: ServerConfig) -> Result<AppState> {
161    use crate::config::StorageBackend;
162    // Only the Postgres / DynamoDB arms name `StorageAdapter` (for the
163    // `Arc<dyn StorageAdapter>` annotation); on a lean build with neither feature
164    // those arms are compiled out, so the import would be unused.
165    #[cfg(any(feature = "postgres", feature = "dynamodb"))]
166    use smooth_operator::adapter::StorageAdapter;
167
168    let verifier = smooth_operator::auth::AuthConfig::from_env()
169        .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
170
171    let state = match config.storage {
172        // The in-memory path is unchanged (synchronous, no external services).
173        StorageBackend::Memory => build_state(config),
174
175        // The Postgres storage backend (and its matching durable admin stores)
176        // is only compiled in on a build with the `postgres` feature (the default
177        // / cloud build). A lean `--no-default-features` build returns a clear
178        // error if `SMOOTH_AGENT_STORAGE=postgres` is requested at runtime.
179        #[cfg(feature = "postgres")]
180        StorageBackend::Postgres => {
181            use smooth_operator_adapter_postgres::PostgresAdapter;
182            // The pgvector column width MUST match the embedder the `/index`
183            // path uses (1536 keyed / 1024 offline). Build the embedder from
184            // config and create the adapter with it so document vectors (at
185            // ingest) and query vectors agree — no silent 1024/1536 mismatch.
186            let embedder = crate::embedder::build_embedder(
187                &crate::embedder::EmbedderConfig::from_server_config(&config),
188            );
189            let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
190                .or_else(|_| std::env::var("DATABASE_URL"))
191                .map_err(|_| {
192                    anyhow::anyhow!(
193                        "Postgres backend selected but neither SMOOTH_AGENT_DATABASE_URL \
194                             nor DATABASE_URL is set"
195                    )
196                })?;
197            let adapter = Arc::new(
198                PostgresAdapter::connect_with_embedder(&conn_str, embedder)
199                    .await
200                    .map_err(|e| anyhow::anyhow!("connecting Postgres storage backend: {e}"))?,
201            );
202            // Admin stores against the SAME database — durable.
203            let connectors = Arc::new(adapter.connector_config_store());
204            let settings = Arc::new(adapter.settings_store());
205            let indexing = Arc::new(adapter.indexing_store());
206            let storage: Arc<dyn StorageAdapter> = adapter;
207            AppState::new(storage, config)
208                .with_connector_configs(connectors)
209                .with_settings(settings)
210                .with_indexing(indexing)
211        }
212
213        // The DynamoDB storage backend is only compiled in on a build with the
214        // `dynamodb` feature (the default / cloud build). A lean build returns a
215        // clear error if `SMOOTH_AGENT_STORAGE=dynamodb` is requested at runtime.
216        #[cfg(feature = "dynamodb")]
217        StorageBackend::Dynamodb => {
218            use smooth_operator_adapter_dynamodb::DynamoDbAdapter;
219            let adapter = Arc::new(
220                DynamoDbAdapter::from_env(None)
221                    .await
222                    .map_err(|e| anyhow::anyhow!("connecting DynamoDB storage backend: {e}"))?,
223            );
224            adapter
225                .create_table()
226                .await
227                .map_err(|e| anyhow::anyhow!("creating DynamoDB table: {e}"))?;
228            // Admin stores against the SAME table — durable.
229            let connectors = Arc::new(adapter.connector_config_store());
230            let settings = Arc::new(adapter.settings_store());
231            let indexing = Arc::new(adapter.indexing_store());
232            let storage: Arc<dyn StorageAdapter> = adapter;
233            AppState::new(storage, config)
234                .with_connector_configs(connectors)
235                .with_settings(settings)
236                .with_indexing(indexing)
237        }
238
239        // Lean build: a persistent backend was requested but its feature wasn't
240        // compiled in. Fail loud with an actionable message rather than silently
241        // running in-memory (which would lose data on restart).
242        #[cfg(not(feature = "postgres"))]
243        StorageBackend::Postgres => {
244            anyhow::bail!(
245                "SMOOTH_AGENT_STORAGE=postgres requires building with --features postgres \
246                 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
247                 with the 'cloud'/'postgres' feature"
248            );
249        }
250        #[cfg(not(feature = "dynamodb"))]
251        StorageBackend::Dynamodb => {
252            anyhow::bail!(
253                "SMOOTH_AGENT_STORAGE=dynamodb requires building with --features dynamodb \
254                 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
255                 with the 'cloud'/'dynamodb' feature"
256            );
257        }
258    };
259
260    let state = install_backplane_from_env(state).await?;
261    let state = install_widget_auth_from_env(state);
262
263    Ok(state.with_auth(Arc::from(verifier)))
264}
265
266/// Select the connection [`Backplane`](smooth_operator::backplane::Backplane)
267/// from `SMOOTH_AGENT_BACKPLANE`, installing it via
268/// [`AppState::with_backplane`](crate::state::AppState::with_backplane).
269///
270/// | value | backend | url env |
271/// |---|---|---|
272/// | unset / `memory` / `inmemory` | single-process (default) | — |
273/// | `redis` / `valkey` | [`RedisBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_REDIS_URL` |
274/// | `nats` | [`NatsBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_NATS_URL` |
275///
276/// A distributed backend is required for >1 replica (otherwise an event produced
277/// on one pod can't reach a socket on another) and to let non-AI publishers push
278/// realtime events via `Backplane::publish`.
279///
280/// # Errors
281/// Returns an error for an unknown backend value, a missing url, or a failed
282/// connection — fail loud at boot rather than silently run single-process.
283async fn install_backplane_from_env(state: AppState) -> Result<AppState> {
284    let kind = std::env::var("SMOOTH_AGENT_BACKPLANE")
285        .unwrap_or_default()
286        .trim()
287        .to_lowercase();
288
289    let url = |specific: &str| -> Result<String> {
290        std::env::var("SMOOTH_AGENT_BACKPLANE_URL")
291            .or_else(|_| std::env::var(specific))
292            .map_err(|_| {
293                anyhow::anyhow!(
294                    "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set"
295                )
296            })
297    };
298
299    match kind.as_str() {
300        "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed
301        // The Redis backplane is only compiled in on a build with the `redis`
302        // feature (the default / cloud build). A lean `--no-default-features`
303        // build returns a clear error rather than silently running single-process.
304        "redis" | "valkey" => {
305            #[cfg(feature = "redis")]
306            {
307                use smooth_operator_adapter_backplane_redis::RedisBackplane;
308                let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?)
309                    .await
310                    .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?;
311                Ok(state.with_backplane(Arc::new(backplane)))
312            }
313            #[cfg(not(feature = "redis"))]
314            {
315                let _ = url; // silence unused-closure warning on the lean build
316                anyhow::bail!(
317                    "SMOOTH_AGENT_BACKPLANE={kind} requires building with --features redis \
318                     (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
319                     with the 'cloud'/'redis' feature"
320                )
321            }
322        }
323        // The NATS backplane is only compiled in on a build with the `nats`
324        // feature (the default / cloud build). A lean build returns a clear error.
325        "nats" => {
326            #[cfg(feature = "nats")]
327            {
328                use smooth_operator_adapter_backplane_nats::NatsBackplane;
329                let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?)
330                    .await
331                    .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?;
332                Ok(state.with_backplane(Arc::new(backplane)))
333            }
334            #[cfg(not(feature = "nats"))]
335            {
336                let _ = url; // silence unused-closure warning on the lean build
337                anyhow::bail!(
338                    "SMOOTH_AGENT_BACKPLANE=nats requires building with --features nats \
339                     (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
340                     with the 'cloud'/'nats' feature"
341                )
342            }
343        }
344        other => Err(anyhow::anyhow!(
345            "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)"
346        )),
347    }
348}
349
350/// Seed a couple of distinctive demo docs so knowledge-grounded E2E is
351/// deterministic. The 17-day return window is deliberately unusual so an
352/// ungrounded answer can't accidentally match it. Both docs are tagged into the
353/// `policies` document set so the admin API can report it.
354pub fn seed_knowledge(storage: &InMemoryStorageAdapter) {
355    let kb = smooth_operator::adapter::StorageAdapter::knowledge(storage);
356    let _ = kb.ingest(smooth_operator::with_document_set(
357        Document::new(
358            "SmooAI's return window is exactly 17 days from delivery. Returns after 17 days are not accepted.",
359            "policies/returns.md",
360            DocumentType::Documentation,
361        ),
362        [SEED_DOCUMENT_SET],
363    ));
364    let _ = kb.ingest(smooth_operator::with_document_set(
365        Document::new(
366            "SmooAI standard shipping takes 5 to 7 business days. Expedited shipping takes 2 business days.",
367            "policies/shipping.md",
368            DocumentType::Documentation,
369        ),
370        [SEED_DOCUMENT_SET],
371    ));
372}
373
374/// Bind on `<SMOOTH_AGENT_BIND>:<port>` (default loopback) and serve until the
375/// process is killed. Returns the bound [`TcpListener`] + the router, used by
376/// both the binary and tests (tests bind port 0 for an ephemeral port).
377///
378/// Uses the **env-configured, secure-by-default** auth verifier
379/// ([`build_state_from_env`]) — the binary refuses to start if auth is
380/// misconfigured rather than silently serving the admin API unauthenticated.
381///
382/// # Errors
383/// Returns an error if the auth configuration is invalid or the TCP bind fails.
384pub async fn bind(config: ServerConfig) -> Result<(TcpListener, Router, CancellationToken)> {
385    let ip: std::net::IpAddr = config
386        .bind
387        .parse()
388        .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
389    let addr = SocketAddr::new(ip, config.port);
390    // Async so a Postgres / DynamoDB storage backend (and its matching durable
391    // admin stores) can be wired; in-memory stays synchronous inside.
392    let state = build_state_from_env_async(config).await?;
393    // Clone the shutdown token BEFORE the state is consumed into the router, so
394    // `run` can cancel it (which fans out to every per-connection clone) when a
395    // SIGTERM/ctrl_c arrives.
396    let shutdown = state.shutdown.clone();
397    let app = router(state);
398    let listener = TcpListener::bind(addr)
399        .await
400        .with_context(|| format!("binding WebSocket server on {addr}"))?;
401    Ok((listener, app, shutdown))
402}
403
404/// Serve a **pre-built** [`AppState`] to completion (blocks), binding on
405/// `state.config.bind:state.config.port`.
406///
407/// This is the library entry point for callers that assemble their own
408/// `AppState` — e.g. the `dev-support` example, which ingests a GitHub repo into
409/// a storage adapter, wires the env-configured [`AuthVerifier`], and then serves
410/// that exact state so the chat-widget queries the ingested knowledge. It does
411/// **not** rebuild the state or touch the ACL/auth/embedder/reranker selection —
412/// those are baked into the `state` the caller passes in. The WS loop, router,
413/// and listening log are identical to [`run`] (which builds its state from env);
414/// `run` is unchanged.
415///
416/// # Errors
417/// Returns an error if the TCP bind fails or serving fails.
418pub async fn serve_state(state: AppState) -> Result<()> {
419    let ip: std::net::IpAddr = state
420        .config
421        .bind
422        .parse()
423        .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
424    let addr = SocketAddr::new(ip, state.config.port);
425    let listener = TcpListener::bind(addr)
426        .await
427        .with_context(|| format!("binding WebSocket server on {addr}"))?;
428    serve_state_on(state, listener).await
429}
430
431/// Serve a pre-built [`AppState`] on an already-bound [`TcpListener`] (blocks).
432///
433/// Splitting the bind from the serve lets a caller (or a test) bind an ephemeral
434/// port, read [`TcpListener::local_addr`] for the real port, then hand the
435/// listener here. Logs the same "listening" line [`run`] does.
436///
437/// # Errors
438/// Returns an error if serving fails.
439pub async fn serve_state_on(state: AppState, listener: TcpListener) -> Result<()> {
440    let has_llm = state.config.has_llm();
441    let model = state.config.model.clone();
442    let gateway = state.config.gateway_url.clone();
443    let local = listener.local_addr().context("local addr")?;
444    let app = router(state);
445
446    tracing::info!(
447        %local,
448        endpoint = "/ws",
449        %model,
450        %gateway,
451        llm_enabled = has_llm,
452        "smooth-operator-server listening"
453    );
454    println!(
455        "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
456    );
457
458    axum::serve(listener, app)
459        .await
460        .context("serving WebSocket connections")?;
461    Ok(())
462}
463
464/// Run the server to completion (blocks). Logs a single listening line.
465///
466/// # Errors
467/// Returns an error if binding or serving fails.
468pub async fn run(config: ServerConfig) -> Result<()> {
469    let has_llm = config.has_llm();
470    let model = config.model.clone();
471    let gateway = config.gateway_url.clone();
472    let (listener, app, shutdown) = bind(config).await?;
473    let local = listener.local_addr().context("local addr")?;
474
475    tracing::info!(
476        %local,
477        endpoint = "/ws",
478        %model,
479        %gateway,
480        llm_enabled = has_llm,
481        "smooth-operator-server listening"
482    );
483    // Also print to stdout so the run-confirmation check is unambiguous without
484    // a tracing subscriber filter.
485    println!(
486        "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
487    );
488
489    // Graceful drain: stop accepting new connections AND cancel the shared
490    // shutdown token on SIGTERM (k8s pod termination) / ctrl_c. Cancelling fans
491    // out to every per-connection reader loop so each finishes its in-flight turn
492    // and detaches from the backplane before the process exits — within the
493    // chart's `terminationGracePeriodSeconds` window.
494    axum::serve(listener, app)
495        .with_graceful_shutdown(async move {
496            wait_for_shutdown_signal().await;
497            tracing::info!("shutdown signal received; draining in-flight WebSocket turns");
498            shutdown.cancel();
499        })
500        .await
501        .context("serving WebSocket connections")?;
502    Ok(())
503}
504
505/// Resolve when the process receives a termination request: SIGTERM (how
506/// Kubernetes asks a pod to stop on scale-down / rollout) **or** ctrl_c
507/// (SIGINT — interactive `cargo run`), whichever comes first.
508///
509/// Unix-only signal handling (the server targets Linux/k8s); on a non-unix host
510/// it falls back to ctrl_c alone so the binary still stops cleanly.
511async fn wait_for_shutdown_signal() {
512    #[cfg(unix)]
513    {
514        use tokio::signal::unix::{signal, SignalKind};
515        // If installing the SIGTERM handler somehow fails, fall back to ctrl_c
516        // only rather than panicking the serve task.
517        let mut sigterm = match signal(SignalKind::terminate()) {
518            Ok(s) => s,
519            Err(e) => {
520                tracing::warn!(error = %e, "failed to install SIGTERM handler; ctrl_c only");
521                let _ = tokio::signal::ctrl_c().await;
522                return;
523            }
524        };
525        tokio::select! {
526            _ = sigterm.recv() => {}
527            _ = tokio::signal::ctrl_c() => {}
528        }
529    }
530    #[cfg(not(unix))]
531    {
532        let _ = tokio::signal::ctrl_c().await;
533    }
534}
535
536/// Query parameters accepted on the `/ws` upgrade. `token` carries the bearer
537/// JWT used to authenticate the connection (browsers can't set custom headers on
538/// a WebSocket handshake, so the token rides on the query string — the standard
539/// pattern for WS auth).
540#[derive(Debug, serde::Deserialize, Default)]
541struct WsQuery {
542    /// The bearer token (raw JWT, no `Bearer ` prefix), if provided.
543    #[serde(default)]
544    token: Option<String>,
545}
546
547/// Resolve the connection's [`AccessContext`] from the `?token=` query param.
548///
549/// **Fail closed for ACL'd content**: when no token is presented, or the auth
550/// verifier is the no-key [`AdminDisabledVerifier`] (admin/auth unconfigured —
551/// dev/no-auth), or the token fails to verify, the connection runs as
552/// [`AccessContext::anonymous`] — which sees **only org-public** knowledge, not
553/// every document. A valid token yields the principal's full
554/// [`AccessContext`](smooth_operator::auth::Principal::access_context) (user id +
555/// groups), so it can read documents scoped to it. Verification failures are
556/// logged (never the token) and degrade to anonymous rather than dropping the
557/// connection, so the dev/no-auth case still serves org-public knowledge.
558/// The connection's resolved auth identity: its document-level
559/// [`AccessContext`] (used to filter retrieval) plus, when the token verified to
560/// a JWT principal, that principal's `org_id` (used to create sessions under the
561/// authenticated org). `org_id` is `None` for the anonymous/dev/no-auth case, so
562/// create-session falls through to the agent's widget-policy org, then the seed
563/// org — leaving the no-auth flavor unchanged.
564struct ConnectionAuth {
565    access: AccessContext,
566    org_id: Option<String>,
567}
568
569fn resolve_ws_access(state: &AppState, query: &WsQuery) -> ConnectionAuth {
570    let Some(token) = query
571        .token
572        .as_deref()
573        .map(str::trim)
574        .filter(|t| !t.is_empty())
575    else {
576        // No token → anonymous (org-public only). Keeps the dev/no-auth `/ws`
577        // path working while failing closed for ACL'd content.
578        return ConnectionAuth {
579            access: AccessContext::anonymous(),
580            org_id: None,
581        };
582    };
583    match state.auth.verify(token) {
584        Ok(principal) => ConnectionAuth {
585            access: principal.access_context(),
586            org_id: Some(principal.org_id),
587        },
588        Err(e) => {
589            // Don't leak the token; log only the mode + a generic reason.
590            tracing::warn!(
591                auth_mode = state.auth.mode(),
592                error = %e,
593                "ws token failed verification; serving org-public knowledge only (anonymous)"
594            );
595            ConnectionAuth {
596                access: AccessContext::anonymous(),
597                org_id: None,
598            }
599        }
600    }
601}
602
603/// Axum handler: upgrade an HTTP request on `/ws` to a WebSocket. The bearer
604/// token (if any) is taken from the `?token=` query param, resolved to an
605/// [`AccessContext`] at connect time, and threaded into every turn so retrieval
606/// is access-controlled per connection.
607async fn ws_upgrade(
608    ws: WebSocketUpgrade,
609    State(state): State<AppState>,
610    Query(query): Query<WsQuery>,
611    headers: axum::http::HeaderMap,
612) -> Response {
613    let ConnectionAuth { access, org_id } = resolve_ws_access(&state, &query);
614    // Capture the browser's `Origin` at the handshake (browsers always send it,
615    // and can't be made to forge another site's). It's enforced per-agent at
616    // session creation against the agent's embed allowlist (widget_auth).
617    let origin = headers
618        .get(axum::http::header::ORIGIN)
619        .and_then(|v| v.to_str().ok())
620        .map(str::to_string);
621    ws.on_upgrade(move |socket| connection_loop(socket, state, access, org_id, origin))
622}
623
624/// Drive one WebSocket connection: split into reader + writer, joined by an
625/// outbound event sink. `access` is the connection's resolved document-level
626/// entitlement, threaded into every `send_message` turn. `auth_org` is the
627/// authenticated JWT principal's org (when the token verified), used to create
628/// sessions under the authenticated org. `origin` is the handshake `Origin`
629/// header, enforced against an agent's embed allowlist.
630async fn connection_loop(
631    socket: WebSocket,
632    state: AppState,
633    access: AccessContext,
634    auth_org: Option<String>,
635    origin: Option<String>,
636) {
637    let (mut ws_tx, mut ws_rx) = socket.split();
638    let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
639
640    // Register this connection's outbound sink with the backplane so events
641    // published from anywhere (this pod or, with a Redis/NATS impl, another) can
642    // reach it. `conn_id` is associated with its session at create-session time.
643    let conn_id = uuid::Uuid::new_v4().to_string();
644    let sink_for_backplane = sink_tx.clone();
645    state
646        .backplane
647        .attach(
648            &conn_id,
649            std::sync::Arc::new(move |event| {
650                let _ = sink_for_backplane.send(event);
651            }),
652        )
653        .await;
654
655    // Writer: drain the sink and write each event as a JSON text frame.
656    let writer = tokio::spawn(async move {
657        while let Some(event) = sink_rx.recv().await {
658            let text = match serde_json::to_string(&event) {
659                Ok(t) => t,
660                Err(_) => continue,
661            };
662            if ws_tx.send(Message::Text(text.into())).await.is_err() {
663                break;
664            }
665        }
666    });
667
668    // Reader: dispatch inbound frames. Handlers emit events via `sink_tx`.
669    //
670    // The `select!` lets a graceful shutdown (SIGTERM/ctrl_c → `state.shutdown`
671    // cancelled by the serve loop) break this loop so the connection drains: it
672    // stops reading new frames, falls out, and detaches below. `biased` so the
673    // shutdown branch wins a tie. Crucially, `handle_frame(...).await` stays
674    // INSIDE the frame arm (not a `select!` condition), so a turn already in
675    // flight when the cancel fires runs to completion before the next loop
676    // iteration observes the cancellation — that is the in-flight drain.
677    loop {
678        tokio::select! {
679            biased;
680
681            () = state.shutdown.cancelled() => {
682                // Pod is terminating: stop accepting frames on this connection.
683                // Returning closes the socket (the writer task ends when
684                // `sink_tx` drops below); any turn that was mid-flight already
685                // finished in the frame arm before we got here.
686                break;
687            }
688
689            frame = ws_rx.next() => {
690                match frame {
691                    Some(Ok(Message::Text(text))) => {
692                        handler::handle_frame(
693                            &state,
694                            &access,
695                            &conn_id,
696                            origin.as_deref(),
697                            auth_org.as_deref(),
698                            text.as_str(),
699                            &sink_tx,
700                        )
701                        .await;
702                    }
703                    Some(Ok(Message::Binary(_))) => {
704                        let _ = sink_tx.send(crate::protocol::error(
705                            None,
706                            "VALIDATION_ERROR",
707                            "binary frames are not supported; send JSON text frames",
708                        ));
709                    }
710                    Some(Ok(Message::Close(_))) => break,
711                    // Ping/Pong control frames are handled by axum automatically.
712                    Some(Ok(_)) => {}
713                    Some(Err(_)) => break,
714                    // Stream ended (peer hung up).
715                    None => break,
716                }
717            }
718        }
719    }
720
721    // Reader finished (peer closed, error, or graceful shutdown) → detach from
722    // the backplane so no stale registry entry is left behind, then drop the
723    // sink so the writer task exits.
724    state.backplane.detach(&conn_id).await;
725    drop(sink_tx);
726    let _ = writer.await;
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732    use smooth_operator::adapter::StorageAdapter;
733
734    #[test]
735    fn seeded_kb_returns_17_day_fact() {
736        let storage = InMemoryStorageAdapter::new();
737        seed_knowledge(&storage);
738        let results = storage
739            .knowledge()
740            .query("return window policy", 3)
741            .expect("query");
742        assert!(
743            results.iter().any(|r| r.chunk.contains("17")),
744            "expected seeded 17-day fact, got: {results:?}"
745        );
746    }
747
748    #[tokio::test]
749    async fn build_state_without_key_has_no_llm() {
750        let cfg = ServerConfig {
751            bind: "127.0.0.1".into(),
752            port: 0,
753            gateway_url: "https://example.test/v1".into(),
754            gateway_key: None,
755            model: "m".into(),
756            seed_kb: true,
757            max_iterations: 4,
758            max_tokens: 128,
759            storage: crate::config::StorageBackend::Memory,
760            widget_auth_strict: false,
761            confirm_tools: Vec::new(),
762        };
763        let state = build_state(cfg);
764        assert!(!state.config.has_llm());
765    }
766}