Skip to main content

smooth_operator_server/
state.rs

1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator::adapter::StorageAdapter;
18use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
19use smooth_operator::backplane::{Backplane, InMemoryBackplane};
20use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
21use smooth_operator::domain::Session;
22use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
23use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
24
25use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
26
27use crate::config::ServerConfig;
28
29/// Shared, cloneable application state handed to every WebSocket connection +
30/// every admin HTTP request.
31#[derive(Clone)]
32pub struct AppState {
33    /// The single storage seam (conversations / participants / messages /
34    /// sessions / checkpoints / knowledge).
35    pub storage: Arc<dyn StorageAdapter>,
36    /// Resolved server configuration (gateway, model, limits).
37    pub config: Arc<ServerConfig>,
38    /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
39    /// `require_role` extractor to turn a bearer token into a `Principal`.
40    pub auth: Arc<dyn AuthVerifier>,
41    /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
42    pub indexing: Arc<dyn IndexingStore>,
43    /// Connector-configuration store, CRUD'd by the admin write API
44    /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
45    /// the secret itself.
46    pub connector_configs: Arc<dyn ConnectorConfigStore>,
47    /// Per-org agent settings store, read/written by `/admin/settings`.
48    pub settings: Arc<dyn SettingsStore>,
49    /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
50    /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
51    /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
52    /// provider via [`with_widget_auth`](Self::with_widget_auth).
53    pub widget_auth: Arc<dyn WidgetAuthProvider>,
54    /// Connection backplane: per-pod sink registry + cross-pod event delivery.
55    /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
56    /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
57    /// and to let non-AI publishers push realtime events to connected clients.
58    pub backplane: Arc<dyn Backplane>,
59    /// Session registry: `sessionId` → session blob. Shared across connections.
60    sessions: Arc<RwLock<HashMap<String, Session>>>,
61    /// Document-set registry, **org-scoped**: `org_id` → (set name → document
62    /// count). The in-memory knowledge backend drops document metadata on
63    /// ingest, so the admin API reads document-set membership from this side
64    /// registry. Keyed by org so org A's document sets are never reported to an
65    /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
66    doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
67    /// Connector registry, **org-scoped**: `org_id` → set of connector names
68    /// whose indexing runs should be listed. Keyed by org so a same-named
69    /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
70    /// only ever lists the caller's org's connectors.
71    connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
72}
73
74/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
75/// with a same-named connector (`"docs"`) record + list **separate** runs. The
76/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
77/// be spoofed to cross an org boundary.
78#[must_use]
79pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
80    format!("IXCONN#{org_id}\u{1}{connector_name}")
81}
82
83impl AppState {
84    /// Construct shared state over a storage adapter and config.
85    ///
86    /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
87    /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
88    /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
89    /// uses none of these, so existing callers are unaffected.
90    #[must_use]
91    pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
92        Self {
93            storage,
94            config: Arc::new(config),
95            auth: Arc::new(NoAuthVerifier::default()),
96            indexing: Arc::new(InMemoryIndexingStore::new()),
97            connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
98            settings: Arc::new(InMemorySettingsStore::new()),
99            widget_auth: Arc::new(PermissiveWidgetAuth),
100            backplane: Arc::new(InMemoryBackplane::new()),
101            sessions: Arc::new(RwLock::new(HashMap::new())),
102            doc_sets: Arc::new(RwLock::new(HashMap::new())),
103            connectors: Arc::new(RwLock::new(HashMap::new())),
104        }
105    }
106
107    /// Install the configured auth verifier (builder).
108    #[must_use]
109    pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
110        self.auth = auth;
111        self
112    }
113
114    /// Install the indexing store (builder).
115    #[must_use]
116    pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
117        self.indexing = indexing;
118        self
119    }
120
121    /// Install the connector-configuration store (builder).
122    #[must_use]
123    pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
124        self.connector_configs = store;
125        self
126    }
127
128    /// Install the agent-settings store (builder).
129    #[must_use]
130    pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
131        self.settings = store;
132        self
133    }
134
135    /// Install the embeddable-widget auth provider (builder). A host backs this
136    /// with its agent store so embed origins + public keys are enforced.
137    #[must_use]
138    pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
139        self.widget_auth = provider;
140        self
141    }
142
143    /// Install the connection backplane (builder). A host installs a Redis/NATS
144    /// impl to scale the WS service horizontally and to let other services push
145    /// realtime events to connected clients via [`Backplane::publish`].
146    #[must_use]
147    pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
148        self.backplane = backplane;
149        self
150    }
151
152    /// Register a freshly created session.
153    pub fn insert_session(&self, session: Session) {
154        if let Ok(mut map) = self.sessions.write() {
155            map.insert(session.session_id.clone(), session);
156        }
157    }
158
159    /// Look up a session by id.
160    #[must_use]
161    pub fn get_session(&self, session_id: &str) -> Option<Session> {
162        self.sessions.read().ok()?.get(session_id).cloned()
163    }
164
165    /// Record that a document was added to a named document set **within an org**
166    /// (increments its count). Used by seeding + the ingest path so
167    /// `GET /admin/document-sets` can report set names + counts despite the
168    /// in-memory backend dropping document metadata. Org-scoped so org A's sets
169    /// are never reported to an org-B caller.
170    pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
171        if let Ok(mut map) = self.doc_sets.write() {
172            *map.entry(org_id.into())
173                .or_default()
174                .entry(set.into())
175                .or_insert(0) += 1;
176        }
177    }
178
179    /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
180    /// sorted by name for a stable response. Never returns another org's sets.
181    #[must_use]
182    pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
183        let Ok(map) = self.doc_sets.read() else {
184            return Vec::new();
185        };
186        let Some(org_sets) = map.get(org_id) else {
187            return Vec::new();
188        };
189        let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
190        out.sort_by(|a, b| a.0.cmp(&b.0));
191        out
192    }
193
194    /// Record a connector (within an org) whose indexing runs should be listed
195    /// (idempotent). Org-scoped so a same-named connector in two orgs records
196    /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
197    pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
198        let name = name.into();
199        if let Ok(mut map) = self.connectors.write() {
200            let v = map.entry(org_id.into()).or_default();
201            if !v.iter().any(|c| c == &name) {
202                v.push(name);
203            }
204        }
205    }
206
207    /// Snapshot **one org's** recorded connector names (sorted, stable). Never
208    /// returns another org's connectors.
209    #[must_use]
210    pub fn connectors(&self, org_id: &str) -> Vec<String> {
211        let Ok(map) = self.connectors.read() else {
212            return Vec::new();
213        };
214        let mut out = map.get(org_id).cloned().unwrap_or_default();
215        out.sort();
216        out
217    }
218}