smooth_operator_server/state.rs
1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator_core::HumanResponse;
18use tokio::sync::mpsc::UnboundedSender;
19
20use smooth_operator::adapter::StorageAdapter;
21use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
22use smooth_operator::backplane::{Backplane, InMemoryBackplane};
23use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
24use smooth_operator::domain::Session;
25use smooth_operator::gateway_key::{EnvGatewayKeyResolver, GatewayKeyResolver};
26use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
27use smooth_operator::tool_provider::ToolProvider;
28use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
29use tokio_util::sync::CancellationToken;
30
31use smooth_operator_core::llm_provider::LlmProvider;
32use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
33
34use crate::config::ServerConfig;
35
36/// Shared, cloneable application state handed to every WebSocket connection +
37/// every admin HTTP request.
38#[derive(Clone)]
39pub struct AppState {
40 /// The single storage seam (conversations / participants / messages /
41 /// sessions / checkpoints / knowledge).
42 pub storage: Arc<dyn StorageAdapter>,
43 /// Resolved server configuration (gateway, model, limits).
44 pub config: Arc<ServerConfig>,
45 /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
46 /// `require_role` extractor to turn a bearer token into a `Principal`.
47 pub auth: Arc<dyn AuthVerifier>,
48 /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
49 pub indexing: Arc<dyn IndexingStore>,
50 /// Connector-configuration store, CRUD'd by the admin write API
51 /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
52 /// the secret itself.
53 pub connector_configs: Arc<dyn ConnectorConfigStore>,
54 /// Per-org agent settings store, read/written by `/admin/settings`.
55 pub settings: Arc<dyn SettingsStore>,
56 /// **Host tool-injection seam.** When `Some`, the runner asks this provider
57 /// for EXTRA tools and merges them into every turn's `ToolRegistry`
58 /// alongside the built-ins. Defaults to `None` (built-ins only); a host
59 /// installs one via [`with_tools`](Self::with_tools) to contribute its own
60 /// per-org tool catalog without forking the runner.
61 pub tool_provider: Option<Arc<dyn ToolProvider>>,
62 /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
63 /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
64 /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
65 /// provider via [`with_widget_auth`](Self::with_widget_auth).
66 pub widget_auth: Arc<dyn WidgetAuthProvider>,
67 /// Connection backplane: per-pod sink registry + cross-pod event delivery.
68 /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
69 /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
70 /// and to let non-AI publishers push realtime events to connected clients.
71 pub backplane: Arc<dyn Backplane>,
72 /// Test-only injected LLM surface. When `Some`, every `send_message` turn
73 /// runs the engine against this provider (a
74 /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient))
75 /// instead of building a live gateway client from `config` — exactly the
76 /// `ServerState(chat_client=mock)` seam the Python reference uses to drive the
77 /// scenario-parity corpus deterministically offline. **`None` in production**
78 /// (a live client is built from the gateway config), so the `/ws` path is
79 /// byte-for-byte unchanged for real deployments. Installed via
80 /// [`with_chat_provider`](Self::with_chat_provider).
81 pub chat_provider: Option<Arc<dyn LlmProvider>>,
82 /// Per-org LLM gateway-key resolver: maps a turn's `org_id` to the gateway
83 /// key it should bill/scope to. Defaults to [`EnvGatewayKeyResolver`] (the
84 /// single `SMOOAI_GATEWAY_KEY` for every org — unchanged local behavior); a
85 /// multi-tenant host installs a per-org resolver via
86 /// [`with_gateway_key_resolver`](Self::with_gateway_key_resolver) so each
87 /// tenant's usage is attributed to its own key. The per-turn LLM-config build
88 /// falls back to the env key whenever the resolver returns `None`.
89 pub gateway_key_resolver: Arc<dyn GatewayKeyResolver>,
90 /// Graceful-shutdown signal, shared across every per-connection clone of this
91 /// state. On SIGTERM/ctrl_c the serve loop cancels this token; each
92 /// connection's reader loop selects on [`CancellationToken::cancelled`] so it
93 /// finishes its in-flight turn, exits, and detaches from the [`Backplane`] —
94 /// no in-flight turn dropped, no stale registry entry left behind. A fresh
95 /// token from [`new`](Self::new) is never cancelled, so the `/ws` path and
96 /// tests are unaffected until a `run`/serve path wires the signal.
97 pub shutdown: CancellationToken,
98 /// Session registry: `sessionId` → session blob. Shared across connections.
99 sessions: Arc<RwLock<HashMap<String, Session>>>,
100 /// Document-set registry, **org-scoped**: `org_id` → (set name → document
101 /// count). The in-memory knowledge backend drops document metadata on
102 /// ingest, so the admin API reads document-set membership from this side
103 /// registry. Keyed by org so org A's document sets are never reported to an
104 /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
105 doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
106 /// Connector registry, **org-scoped**: `org_id` → set of connector names
107 /// whose indexing runs should be listed. Keyed by org so a same-named
108 /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
109 /// only ever lists the caller's org's connectors.
110 connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
111 /// **Human-in-the-loop pending confirmations**: `sessionId` →
112 /// [`HumanResponse`] sender for a turn currently parked on a write-tool
113 /// confirmation. When an agent turn calls a tool that requires human
114 /// approval, the runner installs a `ConfirmationHook` (smooth-operator-core)
115 /// that parks the loop and registers its response sender here. A subsequent
116 /// `confirm_tool_action` frame looks the session up, takes the sender, and
117 /// feeds it [`HumanResponse::Approved`] / [`HumanResponse::Denied`] to resume
118 /// the parked turn (execute or reject the tool). Keyed by session so each
119 /// session has at most one outstanding confirmation; an empty map means no
120 /// turn is parked (the default, byte-for-byte unchanged from before HITL).
121 pending_confirmations: Arc<RwLock<HashMap<String, UnboundedSender<HumanResponse>>>>,
122 /// When `true`, the router mounts the embedded widget host page at `/` and
123 /// the widget bundle at `/chat-widget.iife.js`. Off by default (the
124 /// K8s/Lambda flavors never serve the widget); the local flavor opts in via
125 /// [`with_widget`](Self::with_widget).
126 pub serve_widget: bool,
127 /// The auth token injected into the served widget host page (same-origin), so
128 /// the embedded widget connects to this server's `/ws?token=…`. `None` ⇒ no
129 /// token injected (a no-auth local server).
130 pub widget_token: Option<String>,
131}
132
133/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
134/// with a same-named connector (`"docs"`) record + list **separate** runs. The
135/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
136/// be spoofed to cross an org boundary.
137#[must_use]
138pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
139 format!("IXCONN#{org_id}\u{1}{connector_name}")
140}
141
142impl AppState {
143 /// Construct shared state over a storage adapter and config.
144 ///
145 /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
146 /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
147 /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
148 /// uses none of these, so existing callers are unaffected.
149 #[must_use]
150 pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
151 // Default resolver returns the single env gateway key for every org, so
152 // the local/default flavor is unchanged until a host installs a per-org
153 // resolver via `with_gateway_key_resolver`.
154 let gateway_key_resolver: Arc<dyn GatewayKeyResolver> =
155 Arc::new(EnvGatewayKeyResolver::new(config.gateway_key.clone()));
156 Self {
157 storage,
158 config: Arc::new(config),
159 auth: Arc::new(NoAuthVerifier::default()),
160 indexing: Arc::new(InMemoryIndexingStore::new()),
161 connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
162 settings: Arc::new(InMemorySettingsStore::new()),
163 tool_provider: None,
164 widget_auth: Arc::new(PermissiveWidgetAuth),
165 backplane: Arc::new(InMemoryBackplane::new()),
166 chat_provider: None,
167 gateway_key_resolver,
168 // A fresh, never-cancelled token: every clone of this state shares
169 // its cancellation state, so the serve loop cancelling once fans out
170 // to every connection. Defaulting here (rather than at each call
171 // site) keeps construction ripple-free.
172 shutdown: CancellationToken::new(),
173 sessions: Arc::new(RwLock::new(HashMap::new())),
174 doc_sets: Arc::new(RwLock::new(HashMap::new())),
175 connectors: Arc::new(RwLock::new(HashMap::new())),
176 pending_confirmations: Arc::new(RwLock::new(HashMap::new())),
177 serve_widget: false,
178 widget_token: None,
179 }
180 }
181
182 /// Install the configured auth verifier (builder).
183 #[must_use]
184 pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
185 self.auth = auth;
186 self
187 }
188
189 /// Install the indexing store (builder).
190 #[must_use]
191 pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
192 self.indexing = indexing;
193 self
194 }
195
196 /// Install the connector-configuration store (builder).
197 #[must_use]
198 pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
199 self.connector_configs = store;
200 self
201 }
202
203 /// Install the agent-settings store (builder).
204 #[must_use]
205 pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
206 self.settings = store;
207 self
208 }
209
210 /// Install a host [`ToolProvider`] (builder). The runner merges the
211 /// provider's per-turn tools into every turn's registry alongside the
212 /// built-ins. Without this, the registry is exactly the built-ins, so the
213 /// default/local flavor is unaffected.
214 #[must_use]
215 pub fn with_tools(mut self, provider: Arc<dyn ToolProvider>) -> Self {
216 self.tool_provider = Some(provider);
217 self
218 }
219
220 /// Serve the embedded official widget (host page at `/`, bundle at
221 /// `/chat-widget.iife.js`), injecting `token` into the page so the widget
222 /// connects to this server's `/ws?token=…` (builder). The local deployment
223 /// flavor opts in; other flavors never mount the widget routes.
224 #[must_use]
225 pub fn with_widget(mut self, token: Option<String>) -> Self {
226 self.serve_widget = true;
227 self.widget_token = token;
228 self
229 }
230
231 /// Install the embeddable-widget auth provider (builder). A host backs this
232 /// with its agent store so embed origins + public keys are enforced.
233 #[must_use]
234 pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
235 self.widget_auth = provider;
236 self
237 }
238
239 /// Install the connection backplane (builder). A host installs a Redis/NATS
240 /// impl to scale the WS service horizontally and to let other services push
241 /// realtime events to connected clients via [`Backplane::publish`].
242 #[must_use]
243 pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
244 self.backplane = backplane;
245 self
246 }
247
248 /// Install a test-injected LLM provider (builder). Every `send_message` turn
249 /// then runs the engine against this provider instead of a live gateway
250 /// client — the [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)
251 /// seam the scenario-parity corpus drives. Production never calls this, so the
252 /// live path is unchanged. See [`chat_provider`](Self::chat_provider).
253 #[must_use]
254 pub fn with_chat_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
255 self.chat_provider = Some(provider);
256 self
257 }
258
259 /// Install a per-org gateway-key resolver (builder). A multi-tenant host
260 /// installs a resolver backed by its per-org key store (e.g. one LiteLLM
261 /// virtual key per tenant) so each org's turns are billed/scoped to its own
262 /// key. The per-turn LLM-config build falls back to the env key whenever the
263 /// resolver returns `None`, so a resolver covering only some orgs is safe.
264 /// Leaving this unset keeps the default [`EnvGatewayKeyResolver`] (single env
265 /// key for every org — unchanged local behavior).
266 #[must_use]
267 pub fn with_gateway_key_resolver(mut self, resolver: Arc<dyn GatewayKeyResolver>) -> Self {
268 self.gateway_key_resolver = resolver;
269 self
270 }
271
272 /// Install the graceful-shutdown signal (builder). The serve loop owns a
273 /// clone of this token and cancels it on SIGTERM/ctrl_c; every per-connection
274 /// clone observes the cancellation and drains. Defaulted to a fresh token in
275 /// [`new`](Self::new), so this is only needed when a caller wants to drive
276 /// shutdown from its own token.
277 #[must_use]
278 pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
279 self.shutdown = shutdown;
280 self
281 }
282
283 /// Register a freshly created session.
284 pub fn insert_session(&self, session: Session) {
285 if let Ok(mut map) = self.sessions.write() {
286 map.insert(session.session_id.clone(), session);
287 }
288 }
289
290 /// Look up a session by id.
291 #[must_use]
292 pub fn get_session(&self, session_id: &str) -> Option<Session> {
293 self.sessions.read().ok()?.get(session_id).cloned()
294 }
295
296 /// Record that a document was added to a named document set **within an org**
297 /// (increments its count). Used by seeding + the ingest path so
298 /// `GET /admin/document-sets` can report set names + counts despite the
299 /// in-memory backend dropping document metadata. Org-scoped so org A's sets
300 /// are never reported to an org-B caller.
301 pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
302 if let Ok(mut map) = self.doc_sets.write() {
303 *map.entry(org_id.into())
304 .or_default()
305 .entry(set.into())
306 .or_insert(0) += 1;
307 }
308 }
309
310 /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
311 /// sorted by name for a stable response. Never returns another org's sets.
312 #[must_use]
313 pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
314 let Ok(map) = self.doc_sets.read() else {
315 return Vec::new();
316 };
317 let Some(org_sets) = map.get(org_id) else {
318 return Vec::new();
319 };
320 let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
321 out.sort_by(|a, b| a.0.cmp(&b.0));
322 out
323 }
324
325 /// Record a connector (within an org) whose indexing runs should be listed
326 /// (idempotent). Org-scoped so a same-named connector in two orgs records
327 /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
328 pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
329 let name = name.into();
330 if let Ok(mut map) = self.connectors.write() {
331 let v = map.entry(org_id.into()).or_default();
332 if !v.iter().any(|c| c == &name) {
333 v.push(name);
334 }
335 }
336 }
337
338 /// Snapshot **one org's** recorded connector names (sorted, stable). Never
339 /// returns another org's connectors.
340 #[must_use]
341 pub fn connectors(&self, org_id: &str) -> Vec<String> {
342 let Ok(map) = self.connectors.read() else {
343 return Vec::new();
344 };
345 let mut out = map.get(org_id).cloned().unwrap_or_default();
346 out.sort();
347 out
348 }
349
350 /// Register a parked turn's [`HumanResponse`] sender for `session_id`, so a
351 /// later `confirm_tool_action` can resume it. Any prior pending sender for
352 /// the same session is replaced (one outstanding confirmation per session).
353 /// Called by the runner's confirmation bridge when a write tool emits a
354 /// `HumanRequest::Confirm`.
355 pub fn register_confirmation(
356 &self,
357 session_id: impl Into<String>,
358 responder: UnboundedSender<HumanResponse>,
359 ) {
360 if let Ok(mut map) = self.pending_confirmations.write() {
361 map.insert(session_id.into(), responder);
362 }
363 }
364
365 /// Take (remove + return) the pending [`HumanResponse`] sender for
366 /// `session_id`, if a turn is parked on a confirmation. Returns `None` when
367 /// no turn awaits confirmation for that session (the common case). Taking it
368 /// out — rather than cloning — guarantees a single confirmation resolves a
369 /// single parked tool call, and a duplicate `confirm_tool_action` is a no-op.
370 #[must_use]
371 pub fn take_confirmation(&self, session_id: &str) -> Option<UnboundedSender<HumanResponse>> {
372 self.pending_confirmations.write().ok()?.remove(session_id)
373 }
374
375 /// Drop any pending confirmation registered for `session_id` without
376 /// resolving it. Called when a parked turn ends (the bridge task finishes)
377 /// so a stale sender can't linger and mis-route a later confirmation.
378 pub fn clear_confirmation(&self, session_id: &str) {
379 if let Ok(mut map) = self.pending_confirmations.write() {
380 map.remove(session_id);
381 }
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use async_trait::async_trait;
389 use smooth_operator::gateway_key::resolve_gateway_key;
390 use smooth_operator_adapter_memory::InMemoryStorageAdapter;
391
392 use crate::config::{ServerConfig, StorageBackend, DEFAULT_GATEWAY_URL, DEFAULT_MODEL};
393
394 /// Build a config with an explicit env gateway key for the resolver tests.
395 fn config_with_env_key(env_key: Option<&str>) -> ServerConfig {
396 ServerConfig {
397 bind: "127.0.0.1".to_string(),
398 port: 0,
399 gateway_url: DEFAULT_GATEWAY_URL.to_string(),
400 gateway_key: env_key.map(str::to_string),
401 model: DEFAULT_MODEL.to_string(),
402 seed_kb: false,
403 max_iterations: 6,
404 max_tokens: 512,
405 storage: StorageBackend::Memory,
406 widget_auth_strict: false,
407 confirm_tools: Vec::new(),
408 }
409 }
410
411 fn state_with(config: ServerConfig) -> AppState {
412 AppState::new(Arc::new(InMemoryStorageAdapter::new()), config)
413 }
414
415 /// Per-org resolver covering exactly one org; `None` (→ env fallback) for any
416 /// other org. Mirrors what a multi-tenant host installs.
417 struct OneOrgResolver {
418 org: String,
419 key: String,
420 }
421
422 #[async_trait]
423 impl GatewayKeyResolver for OneOrgResolver {
424 async fn resolve(&self, org_id: &str) -> Option<String> {
425 (org_id == self.org).then(|| self.key.clone())
426 }
427 }
428
429 #[tokio::test]
430 async fn default_state_resolves_env_key_for_every_org() {
431 // No resolver injected: the default `EnvGatewayKeyResolver` returns the
432 // single env key for every org — unchanged local behavior.
433 let state = state_with(config_with_env_key(Some("env-key")));
434 let env = state.config.gateway_key.as_deref();
435 assert_eq!(
436 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
437 Some("env-key".to_string())
438 );
439 assert_eq!(
440 resolve_gateway_key(&state.gateway_key_resolver, "org-z", env).await,
441 Some("env-key".to_string())
442 );
443 }
444
445 #[tokio::test]
446 async fn injected_resolver_overrides_per_org_and_falls_back_to_env() {
447 let config = config_with_env_key(Some("env-key"));
448 let state = state_with(config).with_gateway_key_resolver(Arc::new(OneOrgResolver {
449 org: "org-a".to_string(),
450 key: "org-a-key".to_string(),
451 }));
452 let env = state.config.gateway_key.as_deref();
453
454 // Covered org → its own key.
455 assert_eq!(
456 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
457 Some("org-a-key".to_string())
458 );
459 // Uncovered org → env fallback.
460 assert_eq!(
461 resolve_gateway_key(&state.gateway_key_resolver, "org-b", env).await,
462 Some("env-key".to_string())
463 );
464 }
465
466 #[tokio::test]
467 async fn no_env_key_and_no_resolver_match_resolves_to_none() {
468 // Env key absent + default resolver → no key (turn is unavailable). Same
469 // behavior as today's `llm_config()` returning `None`.
470 let state = state_with(config_with_env_key(None));
471 let env = state.config.gateway_key.as_deref();
472 assert_eq!(
473 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
474 None
475 );
476 }
477}