Skip to main content

smooth_operator_server/
state.rs

1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator_core::HumanResponse;
18use tokio::sync::mpsc::UnboundedSender;
19
20use smooth_operator::adapter::StorageAdapter;
21use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
22use smooth_operator::backplane::{Backplane, InMemoryBackplane};
23use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
24use smooth_operator::domain::Session;
25use smooth_operator::gateway_key::{EnvGatewayKeyResolver, GatewayKeyResolver};
26use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
27use smooth_operator::tool_provider::ToolProvider;
28use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_core::llm_provider::LlmProvider;
32use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
33
34use crate::config::ServerConfig;
35
36/// Shared, cloneable application state handed to every WebSocket connection +
37/// every admin HTTP request.
38#[derive(Clone)]
39pub struct AppState {
40    /// The single storage seam (conversations / participants / messages /
41    /// sessions / checkpoints / knowledge).
42    pub storage: Arc<dyn StorageAdapter>,
43    /// Resolved server configuration (gateway, model, limits).
44    pub config: Arc<ServerConfig>,
45    /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
46    /// `require_role` extractor to turn a bearer token into a `Principal`.
47    pub auth: Arc<dyn AuthVerifier>,
48    /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
49    pub indexing: Arc<dyn IndexingStore>,
50    /// Connector-configuration store, CRUD'd by the admin write API
51    /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
52    /// the secret itself.
53    pub connector_configs: Arc<dyn ConnectorConfigStore>,
54    /// Per-org agent settings store, read/written by `/admin/settings`.
55    pub settings: Arc<dyn SettingsStore>,
56    /// **Host tool-injection seam.** When `Some`, the runner asks this provider
57    /// for EXTRA tools and merges them into every turn's `ToolRegistry`
58    /// alongside the built-ins. Defaults to `None` (built-ins only); a host
59    /// installs one via [`with_tools`](Self::with_tools) to contribute its own
60    /// per-org tool catalog without forking the runner.
61    pub tool_provider: Option<Arc<dyn ToolProvider>>,
62    /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
63    /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
64    /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
65    /// provider via [`with_widget_auth`](Self::with_widget_auth).
66    pub widget_auth: Arc<dyn WidgetAuthProvider>,
67    /// Connection backplane: per-pod sink registry + cross-pod event delivery.
68    /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
69    /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
70    /// and to let non-AI publishers push realtime events to connected clients.
71    pub backplane: Arc<dyn Backplane>,
72    /// Test-only injected LLM surface. When `Some`, every `send_message` turn
73    /// runs the engine against this provider (a
74    /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient))
75    /// instead of building a live gateway client from `config` — exactly the
76    /// `ServerState(chat_client=mock)` seam the Python reference uses to drive the
77    /// scenario-parity corpus deterministically offline. **`None` in production**
78    /// (a live client is built from the gateway config), so the `/ws` path is
79    /// byte-for-byte unchanged for real deployments. Installed via
80    /// [`with_chat_provider`](Self::with_chat_provider).
81    pub chat_provider: Option<Arc<dyn LlmProvider>>,
82    /// Per-org LLM gateway-key resolver: maps a turn's `org_id` to the gateway
83    /// key it should bill/scope to. Defaults to [`EnvGatewayKeyResolver`] (the
84    /// single `SMOOAI_GATEWAY_KEY` for every org — unchanged local behavior); a
85    /// multi-tenant host installs a per-org resolver via
86    /// [`with_gateway_key_resolver`](Self::with_gateway_key_resolver) so each
87    /// tenant's usage is attributed to its own key. The per-turn LLM-config build
88    /// falls back to the env key whenever the resolver returns `None`.
89    pub gateway_key_resolver: Arc<dyn GatewayKeyResolver>,
90    /// Graceful-shutdown signal, shared across every per-connection clone of this
91    /// state. On SIGTERM/ctrl_c the serve loop cancels this token; each
92    /// connection's reader loop selects on [`CancellationToken::cancelled`] so it
93    /// finishes its in-flight turn, exits, and detaches from the [`Backplane`] —
94    /// no in-flight turn dropped, no stale registry entry left behind. A fresh
95    /// token from [`new`](Self::new) is never cancelled, so the `/ws` path and
96    /// tests are unaffected until a `run`/serve path wires the signal.
97    pub shutdown: CancellationToken,
98    /// Session registry: `sessionId` → session blob. Shared across connections.
99    sessions: Arc<RwLock<HashMap<String, Session>>>,
100    /// Document-set registry, **org-scoped**: `org_id` → (set name → document
101    /// count). The in-memory knowledge backend drops document metadata on
102    /// ingest, so the admin API reads document-set membership from this side
103    /// registry. Keyed by org so org A's document sets are never reported to an
104    /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
105    doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
106    /// Connector registry, **org-scoped**: `org_id` → set of connector names
107    /// whose indexing runs should be listed. Keyed by org so a same-named
108    /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
109    /// only ever lists the caller's org's connectors.
110    connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
111    /// **Human-in-the-loop pending confirmations**: `sessionId` →
112    /// [`HumanResponse`] sender for a turn currently parked on a write-tool
113    /// confirmation. When an agent turn calls a tool that requires human
114    /// approval, the runner installs a `ConfirmationHook` (smooth-operator-core)
115    /// that parks the loop and registers its response sender here. A subsequent
116    /// `confirm_tool_action` frame looks the session up, takes the sender, and
117    /// feeds it [`HumanResponse::Approved`] / [`HumanResponse::Denied`] to resume
118    /// the parked turn (execute or reject the tool). Keyed by session so each
119    /// session has at most one outstanding confirmation; an empty map means no
120    /// turn is parked (the default, byte-for-byte unchanged from before HITL).
121    pending_confirmations: Arc<RwLock<HashMap<String, UnboundedSender<HumanResponse>>>>,
122    /// When `true`, the router mounts the embedded widget host page at `/` and
123    /// the widget bundle at `/chat-widget.iife.js`. Off by default (the
124    /// K8s/Lambda flavors never serve the widget); the local flavor opts in via
125    /// [`with_widget`](Self::with_widget).
126    pub serve_widget: bool,
127    /// The auth token injected into the served widget host page (same-origin), so
128    /// the embedded widget connects to this server's `/ws?token=…`. `None` ⇒ no
129    /// token injected (a no-auth local server).
130    pub widget_token: Option<String>,
131}
132
133/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
134/// with a same-named connector (`"docs"`) record + list **separate** runs. The
135/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
136/// be spoofed to cross an org boundary.
137#[must_use]
138pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
139    format!("IXCONN#{org_id}\u{1}{connector_name}")
140}
141
142impl AppState {
143    /// Construct shared state over a storage adapter and config.
144    ///
145    /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
146    /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
147    /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
148    /// uses none of these, so existing callers are unaffected.
149    #[must_use]
150    pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
151        // Default resolver returns the single env gateway key for every org, so
152        // the local/default flavor is unchanged until a host installs a per-org
153        // resolver via `with_gateway_key_resolver`.
154        let gateway_key_resolver: Arc<dyn GatewayKeyResolver> =
155            Arc::new(EnvGatewayKeyResolver::new(config.gateway_key.clone()));
156        Self {
157            storage,
158            config: Arc::new(config),
159            auth: Arc::new(NoAuthVerifier::default()),
160            indexing: Arc::new(InMemoryIndexingStore::new()),
161            connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
162            settings: Arc::new(InMemorySettingsStore::new()),
163            tool_provider: None,
164            widget_auth: Arc::new(PermissiveWidgetAuth),
165            backplane: Arc::new(InMemoryBackplane::new()),
166            chat_provider: None,
167            gateway_key_resolver,
168            // A fresh, never-cancelled token: every clone of this state shares
169            // its cancellation state, so the serve loop cancelling once fans out
170            // to every connection. Defaulting here (rather than at each call
171            // site) keeps construction ripple-free.
172            shutdown: CancellationToken::new(),
173            sessions: Arc::new(RwLock::new(HashMap::new())),
174            doc_sets: Arc::new(RwLock::new(HashMap::new())),
175            connectors: Arc::new(RwLock::new(HashMap::new())),
176            pending_confirmations: Arc::new(RwLock::new(HashMap::new())),
177            serve_widget: false,
178            widget_token: None,
179        }
180    }
181
182    /// Install the configured auth verifier (builder).
183    #[must_use]
184    pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
185        self.auth = auth;
186        self
187    }
188
189    /// Install the indexing store (builder).
190    #[must_use]
191    pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
192        self.indexing = indexing;
193        self
194    }
195
196    /// Install the connector-configuration store (builder).
197    #[must_use]
198    pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
199        self.connector_configs = store;
200        self
201    }
202
203    /// Install the agent-settings store (builder).
204    #[must_use]
205    pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
206        self.settings = store;
207        self
208    }
209
210    /// Install a host [`ToolProvider`] (builder). The runner merges the
211    /// provider's per-turn tools into every turn's registry alongside the
212    /// built-ins. Without this, the registry is exactly the built-ins, so the
213    /// default/local flavor is unaffected.
214    #[must_use]
215    pub fn with_tools(mut self, provider: Arc<dyn ToolProvider>) -> Self {
216        self.tool_provider = Some(provider);
217        self
218    }
219
220    /// Serve the embedded official widget (host page at `/`, bundle at
221    /// `/chat-widget.iife.js`), injecting `token` into the page so the widget
222    /// connects to this server's `/ws?token=…` (builder). The local deployment
223    /// flavor opts in; other flavors never mount the widget routes.
224    #[must_use]
225    pub fn with_widget(mut self, token: Option<String>) -> Self {
226        self.serve_widget = true;
227        self.widget_token = token;
228        self
229    }
230
231    /// Install the embeddable-widget auth provider (builder). A host backs this
232    /// with its agent store so embed origins + public keys are enforced.
233    #[must_use]
234    pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
235        self.widget_auth = provider;
236        self
237    }
238
239    /// Install the connection backplane (builder). A host installs a Redis/NATS
240    /// impl to scale the WS service horizontally and to let other services push
241    /// realtime events to connected clients via [`Backplane::publish`].
242    #[must_use]
243    pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
244        self.backplane = backplane;
245        self
246    }
247
248    /// Install a test-injected LLM provider (builder). Every `send_message` turn
249    /// then runs the engine against this provider instead of a live gateway
250    /// client — the [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)
251    /// seam the scenario-parity corpus drives. Production never calls this, so the
252    /// live path is unchanged. See [`chat_provider`](Self::chat_provider).
253    #[must_use]
254    pub fn with_chat_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
255        self.chat_provider = Some(provider);
256        self
257    }
258
259    /// Install a per-org gateway-key resolver (builder). A multi-tenant host
260    /// installs a resolver backed by its per-org key store (e.g. one LiteLLM
261    /// virtual key per tenant) so each org's turns are billed/scoped to its own
262    /// key. The per-turn LLM-config build falls back to the env key whenever the
263    /// resolver returns `None`, so a resolver covering only some orgs is safe.
264    /// Leaving this unset keeps the default [`EnvGatewayKeyResolver`] (single env
265    /// key for every org — unchanged local behavior).
266    #[must_use]
267    pub fn with_gateway_key_resolver(mut self, resolver: Arc<dyn GatewayKeyResolver>) -> Self {
268        self.gateway_key_resolver = resolver;
269        self
270    }
271
272    /// Install the graceful-shutdown signal (builder). The serve loop owns a
273    /// clone of this token and cancels it on SIGTERM/ctrl_c; every per-connection
274    /// clone observes the cancellation and drains. Defaulted to a fresh token in
275    /// [`new`](Self::new), so this is only needed when a caller wants to drive
276    /// shutdown from its own token.
277    #[must_use]
278    pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
279        self.shutdown = shutdown;
280        self
281    }
282
283    /// Register a freshly created session.
284    pub fn insert_session(&self, session: Session) {
285        if let Ok(mut map) = self.sessions.write() {
286            map.insert(session.session_id.clone(), session);
287        }
288    }
289
290    /// Look up a session by id.
291    #[must_use]
292    pub fn get_session(&self, session_id: &str) -> Option<Session> {
293        self.sessions.read().ok()?.get(session_id).cloned()
294    }
295
296    /// Record that a document was added to a named document set **within an org**
297    /// (increments its count). Used by seeding + the ingest path so
298    /// `GET /admin/document-sets` can report set names + counts despite the
299    /// in-memory backend dropping document metadata. Org-scoped so org A's sets
300    /// are never reported to an org-B caller.
301    pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
302        if let Ok(mut map) = self.doc_sets.write() {
303            *map.entry(org_id.into())
304                .or_default()
305                .entry(set.into())
306                .or_insert(0) += 1;
307        }
308    }
309
310    /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
311    /// sorted by name for a stable response. Never returns another org's sets.
312    #[must_use]
313    pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
314        let Ok(map) = self.doc_sets.read() else {
315            return Vec::new();
316        };
317        let Some(org_sets) = map.get(org_id) else {
318            return Vec::new();
319        };
320        let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
321        out.sort_by(|a, b| a.0.cmp(&b.0));
322        out
323    }
324
325    /// Record a connector (within an org) whose indexing runs should be listed
326    /// (idempotent). Org-scoped so a same-named connector in two orgs records
327    /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
328    pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
329        let name = name.into();
330        if let Ok(mut map) = self.connectors.write() {
331            let v = map.entry(org_id.into()).or_default();
332            if !v.iter().any(|c| c == &name) {
333                v.push(name);
334            }
335        }
336    }
337
338    /// Snapshot **one org's** recorded connector names (sorted, stable). Never
339    /// returns another org's connectors.
340    #[must_use]
341    pub fn connectors(&self, org_id: &str) -> Vec<String> {
342        let Ok(map) = self.connectors.read() else {
343            return Vec::new();
344        };
345        let mut out = map.get(org_id).cloned().unwrap_or_default();
346        out.sort();
347        out
348    }
349
350    /// Register a parked turn's [`HumanResponse`] sender for `session_id`, so a
351    /// later `confirm_tool_action` can resume it. Any prior pending sender for
352    /// the same session is replaced (one outstanding confirmation per session).
353    /// Called by the runner's confirmation bridge when a write tool emits a
354    /// `HumanRequest::Confirm`.
355    pub fn register_confirmation(
356        &self,
357        session_id: impl Into<String>,
358        responder: UnboundedSender<HumanResponse>,
359    ) {
360        if let Ok(mut map) = self.pending_confirmations.write() {
361            map.insert(session_id.into(), responder);
362        }
363    }
364
365    /// Take (remove + return) the pending [`HumanResponse`] sender for
366    /// `session_id`, if a turn is parked on a confirmation. Returns `None` when
367    /// no turn awaits confirmation for that session (the common case). Taking it
368    /// out — rather than cloning — guarantees a single confirmation resolves a
369    /// single parked tool call, and a duplicate `confirm_tool_action` is a no-op.
370    #[must_use]
371    pub fn take_confirmation(&self, session_id: &str) -> Option<UnboundedSender<HumanResponse>> {
372        self.pending_confirmations.write().ok()?.remove(session_id)
373    }
374
375    /// Drop any pending confirmation registered for `session_id` without
376    /// resolving it. Called when a parked turn ends (the bridge task finishes)
377    /// so a stale sender can't linger and mis-route a later confirmation.
378    pub fn clear_confirmation(&self, session_id: &str) {
379        if let Ok(mut map) = self.pending_confirmations.write() {
380            map.remove(session_id);
381        }
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388    use async_trait::async_trait;
389    use smooth_operator::gateway_key::resolve_gateway_key;
390    use smooth_operator_adapter_memory::InMemoryStorageAdapter;
391
392    use crate::config::{ServerConfig, StorageBackend, DEFAULT_GATEWAY_URL, DEFAULT_MODEL};
393
394    /// Build a config with an explicit env gateway key for the resolver tests.
395    fn config_with_env_key(env_key: Option<&str>) -> ServerConfig {
396        ServerConfig {
397            bind: "127.0.0.1".to_string(),
398            port: 0,
399            gateway_url: DEFAULT_GATEWAY_URL.to_string(),
400            gateway_key: env_key.map(str::to_string),
401            model: DEFAULT_MODEL.to_string(),
402            seed_kb: false,
403            max_iterations: 6,
404            max_tokens: 512,
405            storage: StorageBackend::Memory,
406            widget_auth_strict: false,
407            confirm_tools: Vec::new(),
408        }
409    }
410
411    fn state_with(config: ServerConfig) -> AppState {
412        AppState::new(Arc::new(InMemoryStorageAdapter::new()), config)
413    }
414
415    /// Per-org resolver covering exactly one org; `None` (→ env fallback) for any
416    /// other org. Mirrors what a multi-tenant host installs.
417    struct OneOrgResolver {
418        org: String,
419        key: String,
420    }
421
422    #[async_trait]
423    impl GatewayKeyResolver for OneOrgResolver {
424        async fn resolve(&self, org_id: &str) -> Option<String> {
425            (org_id == self.org).then(|| self.key.clone())
426        }
427    }
428
429    #[tokio::test]
430    async fn default_state_resolves_env_key_for_every_org() {
431        // No resolver injected: the default `EnvGatewayKeyResolver` returns the
432        // single env key for every org — unchanged local behavior.
433        let state = state_with(config_with_env_key(Some("env-key")));
434        let env = state.config.gateway_key.as_deref();
435        assert_eq!(
436            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
437            Some("env-key".to_string())
438        );
439        assert_eq!(
440            resolve_gateway_key(&state.gateway_key_resolver, "org-z", env).await,
441            Some("env-key".to_string())
442        );
443    }
444
445    #[tokio::test]
446    async fn injected_resolver_overrides_per_org_and_falls_back_to_env() {
447        let config = config_with_env_key(Some("env-key"));
448        let state = state_with(config).with_gateway_key_resolver(Arc::new(OneOrgResolver {
449            org: "org-a".to_string(),
450            key: "org-a-key".to_string(),
451        }));
452        let env = state.config.gateway_key.as_deref();
453
454        // Covered org → its own key.
455        assert_eq!(
456            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
457            Some("org-a-key".to_string())
458        );
459        // Uncovered org → env fallback.
460        assert_eq!(
461            resolve_gateway_key(&state.gateway_key_resolver, "org-b", env).await,
462            Some("env-key".to_string())
463        );
464    }
465
466    #[tokio::test]
467    async fn no_env_key_and_no_resolver_match_resolves_to_none() {
468        // Env key absent + default resolver → no key (turn is unavailable). Same
469        // behavior as today's `llm_config()` returning `None`.
470        let state = state_with(config_with_env_key(None));
471        let env = state.config.gateway_key.as_deref();
472        assert_eq!(
473            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
474            None
475        );
476    }
477}