Skip to main content

smooth_operator_server/
server.rs

1//! The axum WebSocket server: one `/ws` endpoint, one task per connection.
2//!
3//! Per connection we split the socket and run two tasks joined by an
4//! `UnboundedSender<serde_json::Value>` outbound sink:
5//!
6//! - a **writer** that drains the sink and writes each event as a JSON text
7//!   frame, and
8//! - a **reader** that reads inbound frames and dispatches them via
9//!   [`crate::handler::handle_frame`], passing the sink so handlers (including
10//!   the streaming `send_message`) can emit events as they happen.
11//!
12//! Using a sink channel (instead of writing directly from the handler) is what
13//! lets a streaming turn fire many `stream_token` events from inside the agent
14//! loop while the connection is still reading.
15
16use std::net::SocketAddr;
17use std::sync::Arc;
18
19use anyhow::{Context, Result};
20use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
21use axum::extract::{Query, State};
22use axum::response::{Html, IntoResponse, Response};
23use axum::routing::get;
24use axum::Router;
25
26use futures_util::{SinkExt, StreamExt};
27use smooth_operator::access_control::AccessContext;
28use tokio::net::TcpListener;
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_adapter_memory::InMemoryStorageAdapter;
32use smooth_operator_core::{Document, DocumentType};
33
34use crate::config::ServerConfig;
35use crate::handler;
36use crate::state::AppState;
37
38/// Build the axum [`Router`] for the given application state. Exposed so tests
39/// can boot the server in-process. Serves the WebSocket `/ws` endpoint plus the
40/// auth-gated admin HTTP API under `/admin` (see [`crate::admin`]).
41pub fn router(state: AppState) -> Router {
42    let mut router = Router::new()
43        .route("/ws", get(ws_upgrade))
44        // Unauthenticated liveness/readiness probe. A WebSocket `/ws` upgrade is
45        // not a plain GET, so HTTP load balancers (AWS ALB / nginx ingress) need a
46        // real HTTP route that answers 200 to confirm the listener is up. Cheap
47        // and dependency-free — it does not touch storage/LLM, so it stays Ready
48        // even when an optional backend (gateway key, DB) is degraded.
49        .route("/health", get(health))
50        .merge(crate::admin::router());
51    // The local deployment flavor opts into serving the official widget; other
52    // flavors never mount these routes.
53    if state.serve_widget {
54        router = router
55            .route("/", get(widget_index))
56            .route("/chat-widget.iife.js", get(widget_bundle));
57    }
58    router.with_state(state)
59}
60
61/// Serve the local-flavor widget host page at `/`, injecting the auth token
62/// (same-origin, loopback) so the embedded `<smooth-agent-chat>` connects to
63/// this server's own `/ws?token=…`. The token is JSON-encoded into the page so
64/// quoting is always safe.
65async fn widget_index(State(state): State<AppState>) -> Html<String> {
66    let token_json = serde_json::to_string(state.widget_token.as_deref().unwrap_or(""))
67        .unwrap_or_else(|_| "\"\"".to_string());
68    let html =
69        include_str!("../assets/widget-index.html").replace("__SMOOTH_LOCAL_TOKEN__", &token_json);
70    Html(html)
71}
72
73/// Serve the vendored official widget IIFE bundle.
74async fn widget_bundle() -> impl IntoResponse {
75    (
76        [(
77            axum::http::header::CONTENT_TYPE,
78            "application/javascript; charset=utf-8",
79        )],
80        include_str!("../assets/chat-widget.iife.js"),
81    )
82}
83
84/// `GET /health` → `200 OK`. The minimal HTTP health endpoint for container
85/// orchestrators and HTTP load balancers (the WS `/ws` route can't serve a plain
86/// GET healthcheck).
87async fn health() -> &'static str {
88    "ok"
89}
90
91/// The document set the seeded demo docs are tagged into, so
92/// `GET /admin/document-sets` has something to report in a seeded server.
93const SEED_DOCUMENT_SET: &str = "policies";
94
95/// The org the seeded demo docs + their document-set registry entries belong to.
96/// Mirrors the org `handler::handle_create_session` stamps onto reference
97/// conversations, so the seeded sets show up for the reference org's admin
98/// caller (and ONLY that org — cross-org scoping).
99pub const SEED_ORG_ID: &str = "reference-org";
100
101/// Build an [`AppState`] over a fresh in-memory adapter, seeding the knowledge
102/// base when `config.seed_kb` is set. Exposed for tests + the binary.
103///
104/// The auth verifier defaults to [`NoAuthVerifier`](smooth_operator::auth::NoAuthVerifier)
105/// here (the protocol-only path needs no auth); the **binary** path
106/// ([`build_state_from_env`]) installs the env-configured, secure-by-default
107/// verifier instead.
108#[must_use]
109pub fn build_state(config: ServerConfig) -> AppState {
110    let seed = config.seed_kb;
111    let storage = Arc::new(InMemoryStorageAdapter::new());
112    let state = AppState::new(storage.clone(), config);
113    if seed {
114        seed_knowledge(storage.as_ref());
115        // Record the seeded docs' document-set membership for the admin API
116        // (the in-memory backend drops document metadata, so the registry is the
117        // source of truth for set names + counts).
118        state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
119        state.record_document_set(SEED_ORG_ID, SEED_DOCUMENT_SET);
120    }
121    state
122}
123
124/// Build an [`AppState`] with the **env-configured** auth verifier (secure by
125/// default — see [`smooth_operator::auth::AuthConfig`]). Used by the binary.
126///
127/// # Errors
128/// Returns an error if the auth configuration is invalid (e.g. `AUTH_MODE=jwt`
129/// with no key) — the server refuses to start rather than fall back to no-auth.
130pub fn build_state_from_env(config: ServerConfig) -> Result<AppState> {
131    let verifier = smooth_operator::auth::AuthConfig::from_env()
132        .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
133    let state = install_widget_auth_from_env(build_state(config));
134    Ok(state.with_auth(Arc::from(verifier)))
135}
136
137/// Install an [`HttpWidgetAuth`](smooth_operator::widget_auth::HttpWidgetAuth)
138/// provider from `WIDGET_AUTH_URL` (optionally `WIDGET_AUTH_BEARER` +
139/// `WIDGET_AUTH_TTL_SECS`); otherwise leave the permissive default. This lets a
140/// host enforce embeddable-widget auth against its own policy service by setting
141/// env vars — no custom binary needed. (A host wanting bespoke logic still
142/// installs its own provider via [`AppState::with_widget_auth`].)
143fn install_widget_auth_from_env(state: AppState) -> AppState {
144    let Ok(url) = std::env::var("WIDGET_AUTH_URL") else {
145        return state;
146    };
147    let url = url.trim();
148    if url.is_empty() {
149        return state;
150    }
151    let mut provider = smooth_operator::widget_auth::HttpWidgetAuth::new(url);
152    if let Ok(bearer) = std::env::var("WIDGET_AUTH_BEARER") {
153        let bearer = bearer.trim();
154        if !bearer.is_empty() {
155            provider = provider.with_bearer(bearer);
156        }
157    }
158    if let Some(secs) = std::env::var("WIDGET_AUTH_TTL_SECS")
159        .ok()
160        .and_then(|s| s.trim().parse::<u64>().ok())
161    {
162        provider = provider.with_ttl(std::time::Duration::from_secs(secs));
163    }
164    state.with_widget_auth(Arc::new(provider))
165}
166
167/// Build an [`AppState`] selecting the **storage backend** (and the matching
168/// durable **admin stores**) from `config.storage`, then installing the
169/// env-configured auth verifier.
170///
171/// - [`StorageBackend::Memory`](crate::config::StorageBackend::Memory) — the
172///   in-memory adapter + in-memory admin stores (the [`build_state`] path; lost
173///   on restart). The default.
174/// - [`StorageBackend::Postgres`](crate::config::StorageBackend::Postgres) —
175///   the Postgres + pgvector adapter; the admin stores persist to the **same
176///   database** (`connector_configs` / `agent_settings` / `indexing_runs`).
177///   Connection string from `SMOOTH_AGENT_DATABASE_URL` / `DATABASE_URL`.
178/// - [`StorageBackend::Dynamodb`](crate::config::StorageBackend::Dynamodb) — the
179///   DynamoDB single-table adapter; the admin stores persist to the **same
180///   table**. Table from `SMOOTH_AGENT_DDB_TABLE`; the table is created if
181///   absent.
182///
183/// The admin store backend always matches the storage backend so a connector
184/// config / settings / indexing run survives a restart wherever the
185/// conversations and knowledge live.
186///
187/// # Errors
188/// Returns an error if the auth configuration is invalid, or if the selected
189/// persistent backend fails to connect / migrate.
190pub async fn build_state_from_env_async(config: ServerConfig) -> Result<AppState> {
191    use crate::config::StorageBackend;
192    // Only the Postgres / DynamoDB arms name `StorageAdapter` (for the
193    // `Arc<dyn StorageAdapter>` annotation); on a lean build with neither feature
194    // those arms are compiled out, so the import would be unused.
195    #[cfg(any(feature = "postgres", feature = "dynamodb"))]
196    use smooth_operator::adapter::StorageAdapter;
197
198    let verifier = smooth_operator::auth::AuthConfig::from_env()
199        .map_err(|e| anyhow::anyhow!("auth configuration error: {e}"))?;
200
201    let state = match config.storage {
202        // The in-memory path is unchanged (synchronous, no external services).
203        StorageBackend::Memory => build_state(config),
204
205        // The Postgres storage backend (and its matching durable admin stores)
206        // is only compiled in on a build with the `postgres` feature (the default
207        // / cloud build). A lean `--no-default-features` build returns a clear
208        // error if `SMOOTH_AGENT_STORAGE=postgres` is requested at runtime.
209        #[cfg(feature = "postgres")]
210        StorageBackend::Postgres => {
211            use smooth_operator_adapter_postgres::PostgresAdapter;
212            // The pgvector column width MUST match the embedder the `/index`
213            // path uses (1536 keyed / 1024 offline). Build the embedder from
214            // config and create the adapter with it so document vectors (at
215            // ingest) and query vectors agree — no silent 1024/1536 mismatch.
216            let embedder = crate::embedder::build_embedder(
217                &crate::embedder::EmbedderConfig::from_server_config(&config),
218            );
219            let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
220                .or_else(|_| std::env::var("DATABASE_URL"))
221                .map_err(|_| {
222                    anyhow::anyhow!(
223                        "Postgres backend selected but neither SMOOTH_AGENT_DATABASE_URL \
224                             nor DATABASE_URL is set"
225                    )
226                })?;
227            let adapter = Arc::new(
228                PostgresAdapter::connect_with_embedder(&conn_str, embedder)
229                    .await
230                    .map_err(|e| anyhow::anyhow!("connecting Postgres storage backend: {e}"))?,
231            );
232            // Admin stores against the SAME database — durable.
233            let connectors = Arc::new(adapter.connector_config_store());
234            let settings = Arc::new(adapter.settings_store());
235            let indexing = Arc::new(adapter.indexing_store());
236            // Per-agent behavior config from the monorepo `agents` table on the
237            // same pool — so an agent's `instructions` / `conversation_workflow`
238            // drive its conversations. Degrades to the org default when the table
239            // is absent (a standalone deploy) or a row is malformed.
240            let agent_config = Arc::new(adapter.agent_config_resolver());
241            let storage: Arc<dyn StorageAdapter> = adapter;
242            AppState::new(storage, config)
243                .with_connector_configs(connectors)
244                .with_settings(settings)
245                .with_indexing(indexing)
246                .with_agent_config(agent_config)
247        }
248
249        // The DynamoDB storage backend is only compiled in on a build with the
250        // `dynamodb` feature (the default / cloud build). A lean build returns a
251        // clear error if `SMOOTH_AGENT_STORAGE=dynamodb` is requested at runtime.
252        #[cfg(feature = "dynamodb")]
253        StorageBackend::Dynamodb => {
254            use smooth_operator_adapter_dynamodb::DynamoDbAdapter;
255            let adapter = Arc::new(
256                DynamoDbAdapter::from_env(None)
257                    .await
258                    .map_err(|e| anyhow::anyhow!("connecting DynamoDB storage backend: {e}"))?,
259            );
260            adapter
261                .create_table()
262                .await
263                .map_err(|e| anyhow::anyhow!("creating DynamoDB table: {e}"))?;
264            // Admin stores against the SAME table — durable.
265            let connectors = Arc::new(adapter.connector_config_store());
266            let settings = Arc::new(adapter.settings_store());
267            let indexing = Arc::new(adapter.indexing_store());
268            let storage: Arc<dyn StorageAdapter> = adapter;
269            AppState::new(storage, config)
270                .with_connector_configs(connectors)
271                .with_settings(settings)
272                .with_indexing(indexing)
273        }
274
275        // Lean build: a persistent backend was requested but its feature wasn't
276        // compiled in. Fail loud with an actionable message rather than silently
277        // running in-memory (which would lose data on restart).
278        #[cfg(not(feature = "postgres"))]
279        StorageBackend::Postgres => {
280            anyhow::bail!(
281                "SMOOTH_AGENT_STORAGE=postgres requires building with --features postgres \
282                 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
283                 with the 'cloud'/'postgres' feature"
284            );
285        }
286        #[cfg(not(feature = "dynamodb"))]
287        StorageBackend::Dynamodb => {
288            anyhow::bail!(
289                "SMOOTH_AGENT_STORAGE=dynamodb requires building with --features dynamodb \
290                 (this is a lean/local build); use SMOOTH_AGENT_STORAGE=memory or rebuild \
291                 with the 'cloud'/'dynamodb' feature"
292            );
293        }
294    };
295
296    let state = install_backplane_from_env(state).await?;
297    let state = install_widget_auth_from_env(state);
298
299    Ok(state.with_auth(Arc::from(verifier)))
300}
301
302/// Select the connection [`Backplane`](smooth_operator::backplane::Backplane)
303/// from `SMOOTH_AGENT_BACKPLANE`, installing it via
304/// [`AppState::with_backplane`](crate::state::AppState::with_backplane).
305///
306/// | value | backend | url env |
307/// |---|---|---|
308/// | unset / `memory` / `inmemory` | single-process (default) | — |
309/// | `redis` / `valkey` | [`RedisBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_REDIS_URL` |
310/// | `nats` | [`NatsBackplane`] cross-pod fan-out | `SMOOTH_AGENT_BACKPLANE_URL` \| `SMOOTH_AGENT_NATS_URL` |
311///
312/// A distributed backend is required for >1 replica (otherwise an event produced
313/// on one pod can't reach a socket on another) and to let non-AI publishers push
314/// realtime events via `Backplane::publish`.
315///
316/// # Errors
317/// Returns an error for an unknown backend value, a missing url, or a failed
318/// connection — fail loud at boot rather than silently run single-process.
319async fn install_backplane_from_env(state: AppState) -> Result<AppState> {
320    let kind = std::env::var("SMOOTH_AGENT_BACKPLANE")
321        .unwrap_or_default()
322        .trim()
323        .to_lowercase();
324
325    let url = |specific: &str| -> Result<String> {
326        std::env::var("SMOOTH_AGENT_BACKPLANE_URL")
327            .or_else(|_| std::env::var(specific))
328            .map_err(|_| {
329                anyhow::anyhow!(
330                    "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set"
331                )
332            })
333    };
334
335    match kind.as_str() {
336        "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed
337        // The Redis backplane is only compiled in on a build with the `redis`
338        // feature (the default / cloud build). A lean `--no-default-features`
339        // build returns a clear error rather than silently running single-process.
340        "redis" | "valkey" => {
341            #[cfg(feature = "redis")]
342            {
343                use smooth_operator_adapter_backplane_redis::RedisBackplane;
344                let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?)
345                    .await
346                    .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?;
347                Ok(state.with_backplane(Arc::new(backplane)))
348            }
349            #[cfg(not(feature = "redis"))]
350            {
351                let _ = url; // silence unused-closure warning on the lean build
352                anyhow::bail!(
353                    "SMOOTH_AGENT_BACKPLANE={kind} requires building with --features redis \
354                     (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
355                     with the 'cloud'/'redis' feature"
356                )
357            }
358        }
359        // The NATS backplane is only compiled in on a build with the `nats`
360        // feature (the default / cloud build). A lean build returns a clear error.
361        "nats" => {
362            #[cfg(feature = "nats")]
363            {
364                use smooth_operator_adapter_backplane_nats::NatsBackplane;
365                let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?)
366                    .await
367                    .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?;
368                Ok(state.with_backplane(Arc::new(backplane)))
369            }
370            #[cfg(not(feature = "nats"))]
371            {
372                let _ = url; // silence unused-closure warning on the lean build
373                anyhow::bail!(
374                    "SMOOTH_AGENT_BACKPLANE=nats requires building with --features nats \
375                     (this is a lean/local build); use SMOOTH_AGENT_BACKPLANE=memory or rebuild \
376                     with the 'cloud'/'nats' feature"
377                )
378            }
379        }
380        other => Err(anyhow::anyhow!(
381            "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)"
382        )),
383    }
384}
385
386/// Seed a couple of distinctive demo docs so knowledge-grounded E2E is
387/// deterministic. The 17-day return window is deliberately unusual so an
388/// ungrounded answer can't accidentally match it. Both docs are tagged into the
389/// `policies` document set so the admin API can report it.
390pub fn seed_knowledge(storage: &InMemoryStorageAdapter) {
391    let kb = smooth_operator::adapter::StorageAdapter::knowledge(storage);
392    let _ = kb.ingest(smooth_operator::with_document_set(
393        Document::new(
394            "SmooAI's return window is exactly 17 days from delivery. Returns after 17 days are not accepted.",
395            "policies/returns.md",
396            DocumentType::Documentation,
397        ),
398        [SEED_DOCUMENT_SET],
399    ));
400    let _ = kb.ingest(smooth_operator::with_document_set(
401        Document::new(
402            "SmooAI standard shipping takes 5 to 7 business days. Expedited shipping takes 2 business days.",
403            "policies/shipping.md",
404            DocumentType::Documentation,
405        ),
406        [SEED_DOCUMENT_SET],
407    ));
408}
409
410/// Bind on `<SMOOTH_AGENT_BIND>:<port>` (default loopback) and serve until the
411/// process is killed. Returns the bound [`TcpListener`] + the router, used by
412/// both the binary and tests (tests bind port 0 for an ephemeral port).
413///
414/// Uses the **env-configured, secure-by-default** auth verifier
415/// ([`build_state_from_env`]) — the binary refuses to start if auth is
416/// misconfigured rather than silently serving the admin API unauthenticated.
417///
418/// # Errors
419/// Returns an error if the auth configuration is invalid or the TCP bind fails.
420pub async fn bind(config: ServerConfig) -> Result<(TcpListener, Router, CancellationToken)> {
421    let ip: std::net::IpAddr = config
422        .bind
423        .parse()
424        .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
425    let addr = SocketAddr::new(ip, config.port);
426    // Async so a Postgres / DynamoDB storage backend (and its matching durable
427    // admin stores) can be wired; in-memory stays synchronous inside.
428    let state = build_state_from_env_async(config).await?;
429    // Clone the shutdown token BEFORE the state is consumed into the router, so
430    // `run` can cancel it (which fans out to every per-connection clone) when a
431    // SIGTERM/ctrl_c arrives.
432    let shutdown = state.shutdown.clone();
433    let app = router(state);
434    let listener = TcpListener::bind(addr)
435        .await
436        .with_context(|| format!("binding WebSocket server on {addr}"))?;
437    Ok((listener, app, shutdown))
438}
439
440/// Serve a **pre-built** [`AppState`] to completion (blocks), binding on
441/// `state.config.bind:state.config.port`.
442///
443/// This is the library entry point for callers that assemble their own
444/// `AppState` — e.g. the `dev-support` example, which ingests a GitHub repo into
445/// a storage adapter, wires the env-configured [`AuthVerifier`], and then serves
446/// that exact state so the chat-widget queries the ingested knowledge. It does
447/// **not** rebuild the state or touch the ACL/auth/embedder/reranker selection —
448/// those are baked into the `state` the caller passes in. The WS loop, router,
449/// and listening log are identical to [`run`] (which builds its state from env);
450/// `run` is unchanged.
451///
452/// # Errors
453/// Returns an error if the TCP bind fails or serving fails.
454pub async fn serve_state(state: AppState) -> Result<()> {
455    let ip: std::net::IpAddr = state
456        .config
457        .bind
458        .parse()
459        .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
460    let addr = SocketAddr::new(ip, state.config.port);
461    let listener = TcpListener::bind(addr)
462        .await
463        .with_context(|| format!("binding WebSocket server on {addr}"))?;
464    serve_state_on(state, listener).await
465}
466
467/// Serve a pre-built [`AppState`] on an already-bound [`TcpListener`] (blocks).
468///
469/// Splitting the bind from the serve lets a caller (or a test) bind an ephemeral
470/// port, read [`TcpListener::local_addr`] for the real port, then hand the
471/// listener here. Logs the same "listening" line [`run`] does.
472///
473/// # Errors
474/// Returns an error if serving fails.
475pub async fn serve_state_on(state: AppState, listener: TcpListener) -> Result<()> {
476    let has_llm = state.config.has_llm();
477    let model = state.config.model.clone();
478    let gateway = state.config.gateway_url.clone();
479    let local = listener.local_addr().context("local addr")?;
480    let app = router(state);
481
482    tracing::info!(
483        %local,
484        endpoint = "/ws",
485        %model,
486        %gateway,
487        llm_enabled = has_llm,
488        "smooth-operator-server listening"
489    );
490    println!(
491        "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
492    );
493
494    axum::serve(listener, app)
495        .await
496        .context("serving WebSocket connections")?;
497    Ok(())
498}
499
500/// Run the server to completion (blocks). Logs a single listening line.
501///
502/// # Errors
503/// Returns an error if binding or serving fails.
504pub async fn run(config: ServerConfig) -> Result<()> {
505    let has_llm = config.has_llm();
506    let model = config.model.clone();
507    let gateway = config.gateway_url.clone();
508    let (listener, app, shutdown) = bind(config).await?;
509    let local = listener.local_addr().context("local addr")?;
510
511    tracing::info!(
512        %local,
513        endpoint = "/ws",
514        %model,
515        %gateway,
516        llm_enabled = has_llm,
517        "smooth-operator-server listening"
518    );
519    // Also print to stdout so the run-confirmation check is unambiguous without
520    // a tracing subscriber filter.
521    println!(
522        "smooth-operator-server listening on ws://{local}/ws (model={model}, llm_enabled={has_llm})"
523    );
524
525    // Graceful drain: stop accepting new connections AND cancel the shared
526    // shutdown token on SIGTERM (k8s pod termination) / ctrl_c. Cancelling fans
527    // out to every per-connection reader loop so each finishes its in-flight turn
528    // and detaches from the backplane before the process exits — within the
529    // chart's `terminationGracePeriodSeconds` window.
530    axum::serve(listener, app)
531        .with_graceful_shutdown(async move {
532            wait_for_shutdown_signal().await;
533            tracing::info!("shutdown signal received; draining in-flight WebSocket turns");
534            shutdown.cancel();
535        })
536        .await
537        .context("serving WebSocket connections")?;
538    Ok(())
539}
540
541/// Resolve when the process receives a termination request: SIGTERM (how
542/// Kubernetes asks a pod to stop on scale-down / rollout) **or** ctrl_c
543/// (SIGINT — interactive `cargo run`), whichever comes first.
544///
545/// Unix-only signal handling (the server targets Linux/k8s); on a non-unix host
546/// it falls back to ctrl_c alone so the binary still stops cleanly.
547async fn wait_for_shutdown_signal() {
548    #[cfg(unix)]
549    {
550        use tokio::signal::unix::{signal, SignalKind};
551        // If installing the SIGTERM handler somehow fails, fall back to ctrl_c
552        // only rather than panicking the serve task.
553        let mut sigterm = match signal(SignalKind::terminate()) {
554            Ok(s) => s,
555            Err(e) => {
556                tracing::warn!(error = %e, "failed to install SIGTERM handler; ctrl_c only");
557                let _ = tokio::signal::ctrl_c().await;
558                return;
559            }
560        };
561        tokio::select! {
562            _ = sigterm.recv() => {}
563            _ = tokio::signal::ctrl_c() => {}
564        }
565    }
566    #[cfg(not(unix))]
567    {
568        let _ = tokio::signal::ctrl_c().await;
569    }
570}
571
572/// Query parameters accepted on the `/ws` upgrade. `token` carries the bearer
573/// JWT used to authenticate the connection (browsers can't set custom headers on
574/// a WebSocket handshake, so the token rides on the query string — the standard
575/// pattern for WS auth).
576#[derive(Debug, serde::Deserialize, Default)]
577struct WsQuery {
578    /// The bearer token (raw JWT, no `Bearer ` prefix), if provided.
579    #[serde(default)]
580    token: Option<String>,
581}
582
583/// Resolve the connection's [`AccessContext`] from the `?token=` query param.
584///
585/// **Fail closed for ACL'd content**: when no token is presented, or the auth
586/// verifier is the no-key [`AdminDisabledVerifier`] (admin/auth unconfigured —
587/// dev/no-auth), or the token fails to verify, the connection runs as
588/// [`AccessContext::anonymous`] — which sees **only org-public** knowledge, not
589/// every document. A valid token yields the principal's full
590/// [`AccessContext`](smooth_operator::auth::Principal::access_context) (user id +
591/// groups), so it can read documents scoped to it. Verification failures are
592/// logged (never the token) and degrade to anonymous rather than dropping the
593/// connection, so the dev/no-auth case still serves org-public knowledge.
594/// The connection's resolved auth identity: its document-level
595/// [`AccessContext`] (used to filter retrieval) plus, when the token verified to
596/// a JWT principal, that principal's `org_id` (used to create sessions under the
597/// authenticated org). `org_id` is `None` for the anonymous/dev/no-auth case, so
598/// create-session falls through to the agent's widget-policy org, then the seed
599/// org — leaving the no-auth flavor unchanged.
600struct ConnectionAuth {
601    access: AccessContext,
602    org_id: Option<String>,
603}
604
605/// Resolve the connection's access from `?token=`.
606///
607/// **Lenient (default):** a missing/invalid token degrades to
608/// [`AccessContext::anonymous`] (org-public only) — keeps the dev/no-auth `/ws`
609/// path and the embeddable widget's anonymous flow working while failing closed
610/// for ACL'd content. **Strict ([`AppState::strict_auth`]):** a missing/invalid
611/// token is **rejected** (the connection is refused, not degraded) — what a
612/// single-tenant local/tailnet deployment wants so a tokenless peer can't drive
613/// the agent. Returns `Err(())` to signal "reject the upgrade".
614fn resolve_ws_access(state: &AppState, query: &WsQuery) -> Result<ConnectionAuth, ()> {
615    let Some(token) = query
616        .token
617        .as_deref()
618        .map(str::trim)
619        .filter(|t| !t.is_empty())
620    else {
621        if state.strict_auth {
622            tracing::warn!("strict auth: rejecting tokenless /ws connection");
623            return Err(());
624        }
625        // No token → anonymous (org-public only).
626        return Ok(ConnectionAuth {
627            access: AccessContext::anonymous(),
628            org_id: None,
629        });
630    };
631    match state.auth.verify(token) {
632        Ok(principal) => Ok(ConnectionAuth {
633            access: principal.access_context(),
634            org_id: Some(principal.org_id),
635        }),
636        Err(e) => {
637            // Don't leak the token; log only the mode + a generic reason.
638            tracing::warn!(
639                auth_mode = state.auth.mode(),
640                error = %e,
641                strict = state.strict_auth,
642                "ws token failed verification"
643            );
644            if state.strict_auth {
645                return Err(());
646            }
647            Ok(ConnectionAuth {
648                access: AccessContext::anonymous(),
649                org_id: None,
650            })
651        }
652    }
653}
654
655/// Axum handler: upgrade an HTTP request on `/ws` to a WebSocket. The bearer
656/// token (if any) is taken from the `?token=` query param, resolved to an
657/// [`AccessContext`] at connect time, and threaded into every turn so retrieval
658/// is access-controlled per connection.
659async fn ws_upgrade(
660    ws: WebSocketUpgrade,
661    State(state): State<AppState>,
662    Query(query): Query<WsQuery>,
663    headers: axum::http::HeaderMap,
664) -> Response {
665    let ConnectionAuth { access, org_id } = match resolve_ws_access(&state, &query) {
666        Ok(auth) => auth,
667        // Strict auth refused the connection (missing/invalid token).
668        Err(()) => return (axum::http::StatusCode::UNAUTHORIZED, "unauthorized").into_response(),
669    };
670    // Capture the browser's `Origin` at the handshake (browsers always send it,
671    // and can't be made to forge another site's). It's enforced per-agent at
672    // session creation against the agent's embed allowlist (widget_auth).
673    let origin = headers
674        .get(axum::http::header::ORIGIN)
675        .and_then(|v| v.to_str().ok())
676        .map(str::to_string);
677    ws.on_upgrade(move |socket| connection_loop(socket, state, access, org_id, origin))
678}
679
680/// Drive one WebSocket connection: split into reader + writer, joined by an
681/// outbound event sink. `access` is the connection's resolved document-level
682/// entitlement, threaded into every `send_message` turn. `auth_org` is the
683/// authenticated JWT principal's org (when the token verified), used to create
684/// sessions under the authenticated org. `origin` is the handshake `Origin`
685/// header, enforced against an agent's embed allowlist.
686async fn connection_loop(
687    socket: WebSocket,
688    state: AppState,
689    access: AccessContext,
690    auth_org: Option<String>,
691    origin: Option<String>,
692) {
693    let (mut ws_tx, mut ws_rx) = socket.split();
694    let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
695
696    // Register this connection's outbound sink with the backplane so events
697    // published from anywhere (this pod or, with a Redis/NATS impl, another) can
698    // reach it. `conn_id` is associated with its session at create-session time.
699    let conn_id = uuid::Uuid::new_v4().to_string();
700    let sink_for_backplane = sink_tx.clone();
701    state
702        .backplane
703        .attach(
704            &conn_id,
705            std::sync::Arc::new(move |event| {
706                let _ = sink_for_backplane.send(event);
707            }),
708        )
709        .await;
710
711    // Writer: drain the sink and write each event as a JSON text frame.
712    let writer = tokio::spawn(async move {
713        while let Some(event) = sink_rx.recv().await {
714            let text = match serde_json::to_string(&event) {
715                Ok(t) => t,
716                Err(_) => continue,
717            };
718            if ws_tx.send(Message::Text(text.into())).await.is_err() {
719                break;
720            }
721        }
722    });
723
724    // Reader: dispatch inbound frames. Handlers emit events via `sink_tx`.
725    //
726    // The `select!` lets a graceful shutdown (SIGTERM/ctrl_c → `state.shutdown`
727    // cancelled by the serve loop) break this loop so the connection drains: it
728    // stops reading new frames, falls out, and detaches below. `biased` so the
729    // shutdown branch wins a tie. Crucially, `handle_frame(...).await` stays
730    // INSIDE the frame arm (not a `select!` condition), so a turn already in
731    // flight when the cancel fires runs to completion before the next loop
732    // iteration observes the cancellation — that is the in-flight drain.
733    loop {
734        tokio::select! {
735            biased;
736
737            () = state.shutdown.cancelled() => {
738                // Pod is terminating: stop accepting frames on this connection.
739                // Returning closes the socket (the writer task ends when
740                // `sink_tx` drops below); any turn that was mid-flight already
741                // finished in the frame arm before we got here.
742                break;
743            }
744
745            frame = ws_rx.next() => {
746                match frame {
747                    Some(Ok(Message::Text(text))) => {
748                        handler::handle_frame(
749                            &state,
750                            &access,
751                            &conn_id,
752                            origin.as_deref(),
753                            auth_org.as_deref(),
754                            text.as_str(),
755                            &sink_tx,
756                        )
757                        .await;
758                    }
759                    Some(Ok(Message::Binary(_))) => {
760                        let _ = sink_tx.send(crate::protocol::error(
761                            None,
762                            "VALIDATION_ERROR",
763                            "binary frames are not supported; send JSON text frames",
764                        ));
765                    }
766                    Some(Ok(Message::Close(_))) => break,
767                    // Ping/Pong control frames are handled by axum automatically.
768                    Some(Ok(_)) => {}
769                    Some(Err(_)) => break,
770                    // Stream ended (peer hung up).
771                    None => break,
772                }
773            }
774        }
775    }
776
777    // Reader finished (peer closed, error, or graceful shutdown) → detach from
778    // the backplane so no stale registry entry is left behind, then drop the
779    // sink so the writer task exits.
780    state.backplane.detach(&conn_id).await;
781    drop(sink_tx);
782    let _ = writer.await;
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788    use smooth_operator::adapter::StorageAdapter;
789
790    #[test]
791    fn seeded_kb_returns_17_day_fact() {
792        let storage = InMemoryStorageAdapter::new();
793        seed_knowledge(&storage);
794        let results = storage
795            .knowledge()
796            .query("return window policy", 3)
797            .expect("query");
798        assert!(
799            results.iter().any(|r| r.chunk.contains("17")),
800            "expected seeded 17-day fact, got: {results:?}"
801        );
802    }
803
804    #[tokio::test]
805    async fn build_state_without_key_has_no_llm() {
806        let cfg = ServerConfig {
807            bind: "127.0.0.1".into(),
808            port: 0,
809            gateway_url: "https://example.test/v1".into(),
810            gateway_key: None,
811            model: "m".into(),
812            seed_kb: true,
813            max_iterations: 4,
814            max_tokens: 128,
815            storage: crate::config::StorageBackend::Memory,
816            widget_auth_strict: false,
817            confirm_tools: Vec::new(),
818            judge_model: "claude-haiku-4-5".to_string(),
819        };
820        let state = build_state(cfg);
821        assert!(!state.config.has_llm());
822    }
823}