smooth_operator_server/state.rs
1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator::adapter::StorageAdapter;
18use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
19use smooth_operator::backplane::{Backplane, InMemoryBackplane};
20use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
21use smooth_operator::domain::Session;
22use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
23use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
24
25use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
26
27use crate::config::ServerConfig;
28
29/// Shared, cloneable application state handed to every WebSocket connection +
30/// every admin HTTP request.
31#[derive(Clone)]
32pub struct AppState {
33 /// The single storage seam (conversations / participants / messages /
34 /// sessions / checkpoints / knowledge).
35 pub storage: Arc<dyn StorageAdapter>,
36 /// Resolved server configuration (gateway, model, limits).
37 pub config: Arc<ServerConfig>,
38 /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
39 /// `require_role` extractor to turn a bearer token into a `Principal`.
40 pub auth: Arc<dyn AuthVerifier>,
41 /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
42 pub indexing: Arc<dyn IndexingStore>,
43 /// Connector-configuration store, CRUD'd by the admin write API
44 /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
45 /// the secret itself.
46 pub connector_configs: Arc<dyn ConnectorConfigStore>,
47 /// Per-org agent settings store, read/written by `/admin/settings`.
48 pub settings: Arc<dyn SettingsStore>,
49 /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
50 /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
51 /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
52 /// provider via [`with_widget_auth`](Self::with_widget_auth).
53 pub widget_auth: Arc<dyn WidgetAuthProvider>,
54 /// Connection backplane: per-pod sink registry + cross-pod event delivery.
55 /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
56 /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
57 /// and to let non-AI publishers push realtime events to connected clients.
58 pub backplane: Arc<dyn Backplane>,
59 /// Session registry: `sessionId` → session blob. Shared across connections.
60 sessions: Arc<RwLock<HashMap<String, Session>>>,
61 /// Document-set registry, **org-scoped**: `org_id` → (set name → document
62 /// count). The in-memory knowledge backend drops document metadata on
63 /// ingest, so the admin API reads document-set membership from this side
64 /// registry. Keyed by org so org A's document sets are never reported to an
65 /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
66 doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
67 /// Connector registry, **org-scoped**: `org_id` → set of connector names
68 /// whose indexing runs should be listed. Keyed by org so a same-named
69 /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
70 /// only ever lists the caller's org's connectors.
71 connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
72}
73
74/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
75/// with a same-named connector (`"docs"`) record + list **separate** runs. The
76/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
77/// be spoofed to cross an org boundary.
78#[must_use]
79pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
80 format!("IXCONN#{org_id}\u{1}{connector_name}")
81}
82
83impl AppState {
84 /// Construct shared state over a storage adapter and config.
85 ///
86 /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
87 /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
88 /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
89 /// uses none of these, so existing callers are unaffected.
90 #[must_use]
91 pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
92 Self {
93 storage,
94 config: Arc::new(config),
95 auth: Arc::new(NoAuthVerifier::default()),
96 indexing: Arc::new(InMemoryIndexingStore::new()),
97 connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
98 settings: Arc::new(InMemorySettingsStore::new()),
99 widget_auth: Arc::new(PermissiveWidgetAuth),
100 backplane: Arc::new(InMemoryBackplane::new()),
101 sessions: Arc::new(RwLock::new(HashMap::new())),
102 doc_sets: Arc::new(RwLock::new(HashMap::new())),
103 connectors: Arc::new(RwLock::new(HashMap::new())),
104 }
105 }
106
107 /// Install the configured auth verifier (builder).
108 #[must_use]
109 pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
110 self.auth = auth;
111 self
112 }
113
114 /// Install the indexing store (builder).
115 #[must_use]
116 pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
117 self.indexing = indexing;
118 self
119 }
120
121 /// Install the connector-configuration store (builder).
122 #[must_use]
123 pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
124 self.connector_configs = store;
125 self
126 }
127
128 /// Install the agent-settings store (builder).
129 #[must_use]
130 pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
131 self.settings = store;
132 self
133 }
134
135 /// Install the embeddable-widget auth provider (builder). A host backs this
136 /// with its agent store so embed origins + public keys are enforced.
137 #[must_use]
138 pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
139 self.widget_auth = provider;
140 self
141 }
142
143 /// Install the connection backplane (builder). A host installs a Redis/NATS
144 /// impl to scale the WS service horizontally and to let other services push
145 /// realtime events to connected clients via [`Backplane::publish`].
146 #[must_use]
147 pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
148 self.backplane = backplane;
149 self
150 }
151
152 /// Register a freshly created session.
153 pub fn insert_session(&self, session: Session) {
154 if let Ok(mut map) = self.sessions.write() {
155 map.insert(session.session_id.clone(), session);
156 }
157 }
158
159 /// Look up a session by id.
160 #[must_use]
161 pub fn get_session(&self, session_id: &str) -> Option<Session> {
162 self.sessions.read().ok()?.get(session_id).cloned()
163 }
164
165 /// Record that a document was added to a named document set **within an org**
166 /// (increments its count). Used by seeding + the ingest path so
167 /// `GET /admin/document-sets` can report set names + counts despite the
168 /// in-memory backend dropping document metadata. Org-scoped so org A's sets
169 /// are never reported to an org-B caller.
170 pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
171 if let Ok(mut map) = self.doc_sets.write() {
172 *map.entry(org_id.into())
173 .or_default()
174 .entry(set.into())
175 .or_insert(0) += 1;
176 }
177 }
178
179 /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
180 /// sorted by name for a stable response. Never returns another org's sets.
181 #[must_use]
182 pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
183 let Ok(map) = self.doc_sets.read() else {
184 return Vec::new();
185 };
186 let Some(org_sets) = map.get(org_id) else {
187 return Vec::new();
188 };
189 let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
190 out.sort_by(|a, b| a.0.cmp(&b.0));
191 out
192 }
193
194 /// Record a connector (within an org) whose indexing runs should be listed
195 /// (idempotent). Org-scoped so a same-named connector in two orgs records
196 /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
197 pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
198 let name = name.into();
199 if let Ok(mut map) = self.connectors.write() {
200 let v = map.entry(org_id.into()).or_default();
201 if !v.iter().any(|c| c == &name) {
202 v.push(name);
203 }
204 }
205 }
206
207 /// Snapshot **one org's** recorded connector names (sorted, stable). Never
208 /// returns another org's connectors.
209 #[must_use]
210 pub fn connectors(&self, org_id: &str) -> Vec<String> {
211 let Ok(map) = self.connectors.read() else {
212 return Vec::new();
213 };
214 let mut out = map.get(org_id).cloned().unwrap_or_default();
215 out.sort();
216 out
217 }
218}