smooth_operator_server/state.rs
1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator_core::HumanResponse;
18use tokio::sync::mpsc::UnboundedSender;
19
20use smooth_operator::adapter::StorageAdapter;
21use smooth_operator::agent_config::{AgentConfigResolver, StaticAgentConfigResolver};
22use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
23use smooth_operator::backplane::{Backplane, InMemoryBackplane};
24use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
25use smooth_operator::domain::Session;
26use smooth_operator::gateway_key::{EnvGatewayKeyResolver, GatewayKeyResolver};
27use smooth_operator::otp::{OtpContact, OtpService};
28use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
29use smooth_operator::tool_provider::ToolProvider;
30use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
31use tokio_util::sync::CancellationToken;
32
33use smooth_operator_core::llm_provider::LlmProvider;
34use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
35
36use crate::config::ServerConfig;
37
38/// Shared, cloneable application state handed to every WebSocket connection +
39/// every admin HTTP request.
40#[derive(Clone)]
41pub struct AppState {
42 /// The single storage seam (conversations / participants / messages /
43 /// sessions / checkpoints / knowledge).
44 pub storage: Arc<dyn StorageAdapter>,
45 /// Resolved server configuration (gateway, model, limits).
46 pub config: Arc<ServerConfig>,
47 /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
48 /// `require_role` extractor to turn a bearer token into a `Principal`.
49 pub auth: Arc<dyn AuthVerifier>,
50 /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
51 pub indexing: Arc<dyn IndexingStore>,
52 /// Connector-configuration store, CRUD'd by the admin write API
53 /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
54 /// the secret itself.
55 pub connector_configs: Arc<dyn ConnectorConfigStore>,
56 /// Per-org agent settings store, read/written by `/admin/settings`.
57 pub settings: Arc<dyn SettingsStore>,
58 /// **Host tool-injection seam.** When `Some`, the runner asks this provider
59 /// for EXTRA tools and merges them into every turn's `ToolRegistry`
60 /// alongside the built-ins. Defaults to `None` (built-ins only); a host
61 /// installs one via [`with_tools`](Self::with_tools) to contribute its own
62 /// per-org tool catalog without forking the runner.
63 pub tool_provider: Option<Arc<dyn ToolProvider>>,
64 /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
65 /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
66 /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
67 /// provider via [`with_widget_auth`](Self::with_widget_auth).
68 pub widget_auth: Arc<dyn WidgetAuthProvider>,
69 /// **Per-agent behavior config hook.** Resolves an agent's `instructions`
70 /// (system prompt), `personality`, `greeting`, and `conversation_workflow`
71 /// from its `agent_id` so a public chat agent behaves as its owner configured
72 /// — not as the generic org-default persona. Defaults to
73 /// [`StaticAgentConfigResolver`](smooth_operator::agent_config::StaticAgentConfigResolver) (empty ⇒ no
74 /// per-agent config → the org default persona is used, unchanged); a host
75 /// installs a real provider (backed by the monorepo `agents` table) via
76 /// [`with_agent_config`](Self::with_agent_config).
77 pub agent_config: Arc<dyn AgentConfigResolver>,
78 /// Connection backplane: per-pod sink registry + cross-pod event delivery.
79 /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
80 /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
81 /// and to let non-AI publishers push realtime events to connected clients.
82 pub backplane: Arc<dyn Backplane>,
83 /// Test-only injected LLM surface. When `Some`, every `send_message` turn
84 /// runs the engine against this provider (a
85 /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient))
86 /// instead of building a live gateway client from `config` — exactly the
87 /// `ServerState(chat_client=mock)` seam the Python reference uses to drive the
88 /// scenario-parity corpus deterministically offline. **`None` in production**
89 /// (a live client is built from the gateway config), so the `/ws` path is
90 /// byte-for-byte unchanged for real deployments. Installed via
91 /// [`with_chat_provider`](Self::with_chat_provider).
92 pub chat_provider: Option<Arc<dyn LlmProvider>>,
93 /// Per-org LLM gateway-key resolver: maps a turn's `org_id` to the gateway
94 /// key it should bill/scope to. Defaults to [`EnvGatewayKeyResolver`] (the
95 /// single `SMOOAI_GATEWAY_KEY` for every org — unchanged local behavior); a
96 /// multi-tenant host installs a per-org resolver via
97 /// [`with_gateway_key_resolver`](Self::with_gateway_key_resolver) so each
98 /// tenant's usage is attributed to its own key. The per-turn LLM-config build
99 /// falls back to the env key whenever the resolver returns `None`.
100 pub gateway_key_resolver: Arc<dyn GatewayKeyResolver>,
101 /// **End-user OTP identity-verification seam.** When `Some`, a turn whose
102 /// auth gate refuses an `end_user` tool on an unverified session triggers the
103 /// OTP flow: the server emits `otp_verification_required`, calls
104 /// [`send_otp`](smooth_operator::otp::OtpService::send_otp), and emits
105 /// `otp_sent`; a later `verify_otp` action calls
106 /// [`verify_otp`](smooth_operator::otp::OtpService::verify_otp) and, on
107 /// success, marks the session authenticated. `None` (the default) keeps the
108 /// current fail-closed behavior — the `end_user` tool is refused and no OTP is
109 /// offered. Installed via [`with_otp_service`](Self::with_otp_service). The
110 /// reference server never holds a code; the host owns generation/expiry.
111 pub otp_service: Option<Arc<dyn OtpService>>,
112 /// Graceful-shutdown signal, shared across every per-connection clone of this
113 /// state. On SIGTERM/ctrl_c the serve loop cancels this token; each
114 /// connection's reader loop selects on [`CancellationToken::cancelled`] so it
115 /// finishes its in-flight turn, exits, and detaches from the [`Backplane`] —
116 /// no in-flight turn dropped, no stale registry entry left behind. A fresh
117 /// token from [`new`](Self::new) is never cancelled, so the `/ws` path and
118 /// tests are unaffected until a `run`/serve path wires the signal.
119 pub shutdown: CancellationToken,
120 /// Session registry: `sessionId` → session blob. Shared across connections.
121 sessions: Arc<RwLock<HashMap<String, Session>>>,
122 /// Document-set registry, **org-scoped**: `org_id` → (set name → document
123 /// count). The in-memory knowledge backend drops document metadata on
124 /// ingest, so the admin API reads document-set membership from this side
125 /// registry. Keyed by org so org A's document sets are never reported to an
126 /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
127 doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
128 /// Connector registry, **org-scoped**: `org_id` → set of connector names
129 /// whose indexing runs should be listed. Keyed by org so a same-named
130 /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
131 /// only ever lists the caller's org's connectors.
132 connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
133 /// **Human-in-the-loop pending confirmations**: `sessionId` →
134 /// [`HumanResponse`] sender for a turn currently parked on a write-tool
135 /// confirmation. When an agent turn calls a tool that requires human
136 /// approval, the runner installs a `ConfirmationHook` (smooth-operator-core)
137 /// that parks the loop and registers its response sender here. A subsequent
138 /// `confirm_tool_action` frame looks the session up, takes the sender, and
139 /// feeds it [`HumanResponse::Approved`] / [`HumanResponse::Denied`] to resume
140 /// the parked turn (execute or reject the tool). Keyed by session so each
141 /// session has at most one outstanding confirmation; an empty map means no
142 /// turn is parked (the default, byte-for-byte unchanged from before HITL).
143 pending_confirmations: Arc<RwLock<HashMap<String, UnboundedSender<HumanResponse>>>>,
144 /// When `true`, the router mounts the embedded widget host page at `/` and
145 /// the widget bundle at `/chat-widget.iife.js`. Off by default (the
146 /// K8s/Lambda flavors never serve the widget); the local flavor opts in via
147 /// [`with_widget`](Self::with_widget).
148 pub serve_widget: bool,
149 /// The auth token injected into the served widget host page (same-origin), so
150 /// the embedded widget connects to this server's `/ws?token=…`. `None` ⇒ no
151 /// token injected (a no-auth local server).
152 pub widget_token: Option<String>,
153 /// **Strict auth.** When `true`, the `/ws` connect path **rejects** a
154 /// missing/invalid token (HTTP 401) instead of degrading to an anonymous
155 /// connection. Off by default (K8s/widget anonymous flows unchanged); a
156 /// single-tenant local/tailnet deployment opts in via
157 /// [`with_strict_auth`](Self::with_strict_auth) so a tokenless peer can't
158 /// drive the agent.
159 pub strict_auth: bool,
160 /// **Default agent persona / system prompt.** When `Some`, it is used as the
161 /// turn's system prompt whenever the per-org [`AgentSettings::persona`] is
162 /// `None` — i.e. a host-supplied default that replaces the built-in
163 /// customer-support [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`](crate::runner) when no
164 /// per-org override exists. The single-tenant local daemon installs its
165 /// "Big Smooth" personal-assistant persona here via
166 /// [`with_default_persona`](Self::with_default_persona). `None` (the default)
167 /// keeps the const prompt, so the cloud flavor is byte-for-byte unchanged.
168 pub default_persona: Option<String>,
169 /// **Model-pricing cache** for `GET /admin/model-costs`. The gateway's
170 /// `/v1/model/info` pricing is stable, so it's fetched at most once per
171 /// process and reused for every subsequent request (the admin handler sets
172 /// this on the first successful fetch; a gateway error is NOT cached, so a
173 /// transient failure is retried on the next request). Shared across clones so
174 /// every connection/request sees the same cached map.
175 pub model_costs_cache: Arc<tokio::sync::OnceCell<serde_json::Value>>,
176}
177
178/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
179/// with a same-named connector (`"docs"`) record + list **separate** runs. The
180/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
181/// be spoofed to cross an org boundary.
182#[must_use]
183pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
184 format!("IXCONN#{org_id}\u{1}{connector_name}")
185}
186
187impl AppState {
188 /// Construct shared state over a storage adapter and config.
189 ///
190 /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
191 /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
192 /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
193 /// uses none of these, so existing callers are unaffected.
194 #[must_use]
195 pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
196 // Default resolver returns the single env gateway key for every org, so
197 // the local/default flavor is unchanged until a host installs a per-org
198 // resolver via `with_gateway_key_resolver`.
199 let gateway_key_resolver: Arc<dyn GatewayKeyResolver> =
200 Arc::new(EnvGatewayKeyResolver::new(config.gateway_key.clone()));
201 Self {
202 storage,
203 config: Arc::new(config),
204 auth: Arc::new(NoAuthVerifier::default()),
205 indexing: Arc::new(InMemoryIndexingStore::new()),
206 connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
207 settings: Arc::new(InMemorySettingsStore::new()),
208 tool_provider: None,
209 widget_auth: Arc::new(PermissiveWidgetAuth),
210 agent_config: Arc::new(StaticAgentConfigResolver::default()),
211 backplane: Arc::new(InMemoryBackplane::new()),
212 chat_provider: None,
213 gateway_key_resolver,
214 otp_service: None,
215 // A fresh, never-cancelled token: every clone of this state shares
216 // its cancellation state, so the serve loop cancelling once fans out
217 // to every connection. Defaulting here (rather than at each call
218 // site) keeps construction ripple-free.
219 shutdown: CancellationToken::new(),
220 sessions: Arc::new(RwLock::new(HashMap::new())),
221 doc_sets: Arc::new(RwLock::new(HashMap::new())),
222 connectors: Arc::new(RwLock::new(HashMap::new())),
223 pending_confirmations: Arc::new(RwLock::new(HashMap::new())),
224 serve_widget: false,
225 widget_token: None,
226 strict_auth: false,
227 default_persona: None,
228 model_costs_cache: Arc::new(tokio::sync::OnceCell::new()),
229 }
230 }
231
232 /// Install the configured auth verifier (builder).
233 #[must_use]
234 pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
235 self.auth = auth;
236 self
237 }
238
239 /// Replace the storage adapter (builder).
240 ///
241 /// Lets an embedder (e.g. the local-flavor daemon) swap the default
242 /// in-memory store for a **durable local adapter** — the seam an always-on,
243 /// self-hosted deployment needs so conversations/sessions/checkpoints
244 /// survive a restart without standing up Postgres.
245 #[must_use]
246 pub fn with_storage(mut self, storage: Arc<dyn StorageAdapter>) -> Self {
247 self.storage = storage;
248 self
249 }
250
251 /// Install the indexing store (builder).
252 #[must_use]
253 pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
254 self.indexing = indexing;
255 self
256 }
257
258 /// Install the connector-configuration store (builder).
259 #[must_use]
260 pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
261 self.connector_configs = store;
262 self
263 }
264
265 /// Install the agent-settings store (builder).
266 #[must_use]
267 pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
268 self.settings = store;
269 self
270 }
271
272 /// Install a host [`ToolProvider`] (builder). The runner merges the
273 /// provider's per-turn tools into every turn's registry alongside the
274 /// built-ins. Without this, the registry is exactly the built-ins, so the
275 /// default/local flavor is unaffected.
276 #[must_use]
277 pub fn with_tools(mut self, provider: Arc<dyn ToolProvider>) -> Self {
278 self.tool_provider = Some(provider);
279 self
280 }
281
282 /// Enable **strict auth** (builder): reject `/ws` connections with a
283 /// missing/invalid token (HTTP 401) instead of degrading to anonymous. Pair
284 /// with a real [`with_auth`](Self::with_auth) verifier. Off by default.
285 #[must_use]
286 pub fn with_strict_auth(mut self, strict: bool) -> Self {
287 self.strict_auth = strict;
288 self
289 }
290
291 /// Install a **default agent persona** (builder): the system prompt used for
292 /// a turn when the per-org [`AgentSettings::persona`] is unset. A single-tenant
293 /// host (the local daemon) installs its own personality here so every turn
294 /// runs as that agent rather than the built-in customer-support prompt. `None`
295 /// (the default) keeps the const prompt, so the cloud flavor is unchanged. An
296 /// empty/whitespace-only string is treated as no default.
297 #[must_use]
298 pub fn with_default_persona(mut self, persona: impl Into<String>) -> Self {
299 let persona = persona.into();
300 self.default_persona = if persona.trim().is_empty() {
301 None
302 } else {
303 Some(persona)
304 };
305 self
306 }
307
308 /// Serve the embedded official widget (host page at `/`, bundle at
309 /// `/chat-widget.iife.js`), injecting `token` into the page so the widget
310 /// connects to this server's `/ws?token=…` (builder). The local deployment
311 /// flavor opts in; other flavors never mount the widget routes.
312 #[must_use]
313 pub fn with_widget(mut self, token: Option<String>) -> Self {
314 self.serve_widget = true;
315 self.widget_token = token;
316 self
317 }
318
319 /// Install the embeddable-widget auth provider (builder). A host backs this
320 /// with its agent store so embed origins + public keys are enforced.
321 #[must_use]
322 pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
323 self.widget_auth = provider;
324 self
325 }
326
327 /// Install the per-agent behavior-config provider (builder). A host backs
328 /// this with its `agents` store so each agent's `instructions` /
329 /// `conversation_workflow` drive its conversations. Without it, the runner
330 /// falls back to the org-default persona (unchanged behavior).
331 #[must_use]
332 pub fn with_agent_config(mut self, provider: Arc<dyn AgentConfigResolver>) -> Self {
333 self.agent_config = provider;
334 self
335 }
336
337 /// Install the connection backplane (builder). A host installs a Redis/NATS
338 /// impl to scale the WS service horizontally and to let other services push
339 /// realtime events to connected clients via [`Backplane::publish`].
340 #[must_use]
341 pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
342 self.backplane = backplane;
343 self
344 }
345
346 /// Install a test-injected LLM provider (builder). Every `send_message` turn
347 /// then runs the engine against this provider instead of a live gateway
348 /// client — the [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)
349 /// seam the scenario-parity corpus drives. Production never calls this, so the
350 /// live path is unchanged. See [`chat_provider`](Self::chat_provider).
351 #[must_use]
352 pub fn with_chat_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
353 self.chat_provider = Some(provider);
354 self
355 }
356
357 /// Install a per-org gateway-key resolver (builder). A multi-tenant host
358 /// installs a resolver backed by its per-org key store (e.g. one LiteLLM
359 /// virtual key per tenant) so each org's turns are billed/scoped to its own
360 /// key. The per-turn LLM-config build falls back to the env key whenever the
361 /// resolver returns `None`, so a resolver covering only some orgs is safe.
362 /// Leaving this unset keeps the default [`EnvGatewayKeyResolver`] (single env
363 /// key for every org — unchanged local behavior).
364 #[must_use]
365 pub fn with_gateway_key_resolver(mut self, resolver: Arc<dyn GatewayKeyResolver>) -> Self {
366 self.gateway_key_resolver = resolver;
367 self
368 }
369
370 /// Install the end-user OTP identity-verification service (builder). Wires the
371 /// `end_user` auth gate to an OTP flow (see [`otp_service`](Self::otp_service));
372 /// leaving it unset keeps the fail-closed default (refuse, no OTP offered).
373 #[must_use]
374 pub fn with_otp_service(mut self, service: Arc<dyn OtpService>) -> Self {
375 self.otp_service = Some(service);
376 self
377 }
378
379 /// Install the graceful-shutdown signal (builder). The serve loop owns a
380 /// clone of this token and cancels it on SIGTERM/ctrl_c; every per-connection
381 /// clone observes the cancellation and drains. Defaulted to a fresh token in
382 /// [`new`](Self::new), so this is only needed when a caller wants to drive
383 /// shutdown from its own token.
384 #[must_use]
385 pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
386 self.shutdown = shutdown;
387 self
388 }
389
390 /// Register a freshly created session.
391 pub fn insert_session(&self, session: Session) {
392 if let Ok(mut map) = self.sessions.write() {
393 map.insert(session.session_id.clone(), session);
394 }
395 }
396
397 /// Look up a session by id.
398 #[must_use]
399 pub fn get_session(&self, session_id: &str) -> Option<Session> {
400 self.sessions.read().ok()?.get(session_id).cloned()
401 }
402
403 /// The conversation-workflow step this session is currently on, read from the
404 /// session's `metadata.currentStepId`. `None` = no workflow / fresh start (the
405 /// runner then resolves to the workflow's first step).
406 #[must_use]
407 pub fn session_current_step(&self, session_id: &str) -> Option<String> {
408 self.sessions
409 .read()
410 .ok()?
411 .get(session_id)?
412 .metadata
413 .as_ref()?
414 .get("currentStepId")?
415 .as_str()
416 .map(str::to_string)
417 }
418
419 /// Persist the workflow step pointer onto the in-memory session's
420 /// `metadata.currentStepId`. Matches the session registry's durability (the
421 /// pointer lives as long as the session does, on the pod that owns it). A
422 /// `None` step clears the pointer. No-op for an unknown session.
423 pub fn set_session_current_step(&self, session_id: &str, step_id: Option<&str>) {
424 if let Ok(mut map) = self.sessions.write() {
425 if let Some(session) = map.get_mut(session_id) {
426 let mut meta = session.metadata.take().unwrap_or_default();
427 match step_id {
428 Some(id) => {
429 meta.insert("currentStepId".to_string(), serde_json::Value::from(id));
430 }
431 None => {
432 meta.remove("currentStepId");
433 }
434 }
435 session.metadata = Some(meta);
436 }
437 }
438 }
439
440 /// Whether this session's caller has completed OTP identity verification,
441 /// read from the session's `metadata.otpVerified`. `false` for an unknown or
442 /// unverified session. Threaded into the `end_user` auth gate so a verified
443 /// session's gated tools run. Same durability as the session registry (lives
444 /// as long as the session, on the pod that owns it).
445 #[must_use]
446 pub fn session_authenticated(&self, session_id: &str) -> bool {
447 self.sessions
448 .read()
449 .ok()
450 .and_then(|map| {
451 map.get(session_id)?
452 .metadata
453 .as_ref()?
454 .get("otpVerified")?
455 .as_bool()
456 })
457 .unwrap_or(false)
458 }
459
460 /// Mark this session identity-verified (or clear it) by setting
461 /// `metadata.otpVerified`. Called after a successful `verify_otp`. No-op for
462 /// an unknown session. Coexists with the workflow step pointer (both live in
463 /// the session's metadata map).
464 pub fn set_session_authenticated(&self, session_id: &str, verified: bool) {
465 if let Ok(mut map) = self.sessions.write() {
466 if let Some(session) = map.get_mut(session_id) {
467 let mut meta = session.metadata.take().unwrap_or_default();
468 meta.insert("otpVerified".to_string(), serde_json::Value::from(verified));
469 session.metadata = Some(meta);
470 }
471 }
472 }
473
474 /// The caller's OTP contact points for this session, read from the session's
475 /// `metadata.contactEmail` / `metadata.contactPhone` (stashed at
476 /// create-session time). Empty when the session is unknown or captured no
477 /// contact — the server then can't offer OTP. The reference create-session
478 /// path captures only an email.
479 #[must_use]
480 pub fn session_contact(&self, session_id: &str) -> OtpContact {
481 let Ok(map) = self.sessions.read() else {
482 return OtpContact::default();
483 };
484 let Some(meta) = map.get(session_id).and_then(|s| s.metadata.as_ref()) else {
485 return OtpContact::default();
486 };
487 OtpContact {
488 email: meta
489 .get("contactEmail")
490 .and_then(|v| v.as_str())
491 .map(str::to_string),
492 phone: meta
493 .get("contactPhone")
494 .and_then(|v| v.as_str())
495 .map(str::to_string),
496 }
497 }
498
499 /// Record that a document was added to a named document set **within an org**
500 /// (increments its count). Used by seeding + the ingest path so
501 /// `GET /admin/document-sets` can report set names + counts despite the
502 /// in-memory backend dropping document metadata. Org-scoped so org A's sets
503 /// are never reported to an org-B caller.
504 pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
505 if let Ok(mut map) = self.doc_sets.write() {
506 *map.entry(org_id.into())
507 .or_default()
508 .entry(set.into())
509 .or_insert(0) += 1;
510 }
511 }
512
513 /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
514 /// sorted by name for a stable response. Never returns another org's sets.
515 #[must_use]
516 pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
517 let Ok(map) = self.doc_sets.read() else {
518 return Vec::new();
519 };
520 let Some(org_sets) = map.get(org_id) else {
521 return Vec::new();
522 };
523 let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
524 out.sort_by(|a, b| a.0.cmp(&b.0));
525 out
526 }
527
528 /// Record a connector (within an org) whose indexing runs should be listed
529 /// (idempotent). Org-scoped so a same-named connector in two orgs records
530 /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
531 pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
532 let name = name.into();
533 if let Ok(mut map) = self.connectors.write() {
534 let v = map.entry(org_id.into()).or_default();
535 if !v.iter().any(|c| c == &name) {
536 v.push(name);
537 }
538 }
539 }
540
541 /// Snapshot **one org's** recorded connector names (sorted, stable). Never
542 /// returns another org's connectors.
543 #[must_use]
544 pub fn connectors(&self, org_id: &str) -> Vec<String> {
545 let Ok(map) = self.connectors.read() else {
546 return Vec::new();
547 };
548 let mut out = map.get(org_id).cloned().unwrap_or_default();
549 out.sort();
550 out
551 }
552
553 /// Register a parked turn's [`HumanResponse`] sender for `session_id`, so a
554 /// later `confirm_tool_action` can resume it. Any prior pending sender for
555 /// the same session is replaced (one outstanding confirmation per session).
556 /// Called by the runner's confirmation bridge when a write tool emits a
557 /// `HumanRequest::Confirm`.
558 pub fn register_confirmation(
559 &self,
560 session_id: impl Into<String>,
561 responder: UnboundedSender<HumanResponse>,
562 ) {
563 if let Ok(mut map) = self.pending_confirmations.write() {
564 map.insert(session_id.into(), responder);
565 }
566 }
567
568 /// Take (remove + return) the pending [`HumanResponse`] sender for
569 /// `session_id`, if a turn is parked on a confirmation. Returns `None` when
570 /// no turn awaits confirmation for that session (the common case). Taking it
571 /// out — rather than cloning — guarantees a single confirmation resolves a
572 /// single parked tool call, and a duplicate `confirm_tool_action` is a no-op.
573 #[must_use]
574 pub fn take_confirmation(&self, session_id: &str) -> Option<UnboundedSender<HumanResponse>> {
575 self.pending_confirmations.write().ok()?.remove(session_id)
576 }
577
578 /// Drop any pending confirmation registered for `session_id` without
579 /// resolving it. Called when a parked turn ends (the bridge task finishes)
580 /// so a stale sender can't linger and mis-route a later confirmation.
581 pub fn clear_confirmation(&self, session_id: &str) {
582 if let Ok(mut map) = self.pending_confirmations.write() {
583 map.remove(session_id);
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591 use async_trait::async_trait;
592 use smooth_operator::gateway_key::resolve_gateway_key;
593 use smooth_operator_adapter_memory::InMemoryStorageAdapter;
594
595 use crate::config::{ServerConfig, StorageBackend, DEFAULT_GATEWAY_URL, DEFAULT_MODEL};
596
597 /// Build a config with an explicit env gateway key for the resolver tests.
598 fn config_with_env_key(env_key: Option<&str>) -> ServerConfig {
599 ServerConfig {
600 bind: "127.0.0.1".to_string(),
601 port: 0,
602 gateway_url: DEFAULT_GATEWAY_URL.to_string(),
603 gateway_key: env_key.map(str::to_string),
604 model: DEFAULT_MODEL.to_string(),
605 seed_kb: false,
606 max_iterations: 6,
607 max_tokens: 512,
608 storage: StorageBackend::Memory,
609 widget_auth_strict: false,
610 confirm_tools: Vec::new(),
611 judge_model: "claude-haiku-4-5".to_string(),
612 }
613 }
614
615 fn state_with(config: ServerConfig) -> AppState {
616 AppState::new(Arc::new(InMemoryStorageAdapter::new()), config)
617 }
618
619 #[test]
620 fn default_persona_unset_by_default() {
621 let state = state_with(config_with_env_key(None));
622 assert_eq!(
623 state.default_persona, None,
624 "no default persona unless a host installs one"
625 );
626 }
627
628 #[test]
629 fn with_default_persona_installs_and_trims_empty() {
630 let state =
631 state_with(config_with_env_key(None)).with_default_persona("You are Big Smooth.");
632 assert_eq!(
633 state.default_persona.as_deref(),
634 Some("You are Big Smooth.")
635 );
636 // An empty / whitespace-only persona is treated as "no default".
637 let blank = state_with(config_with_env_key(None)).with_default_persona(" ");
638 assert_eq!(blank.default_persona, None, "blank persona is ignored");
639 }
640
641 /// Minimal session for the step-tracking tests.
642 fn test_session(session_id: &str) -> Session {
643 Session {
644 session_id: session_id.to_string(),
645 conversation_id: "conv".to_string(),
646 organization_id: "org".to_string(),
647 agent_id: "agent".to_string(),
648 agent_name: "Agent".to_string(),
649 user_participant_id: "u".to_string(),
650 agent_participant_id: "a".to_string(),
651 thread_id: "conv".to_string(),
652 status: Some(smooth_operator::domain::SessionStatus::Active),
653 token_count: Some(0),
654 message_count: Some(0),
655 metadata: None,
656 created_at: None,
657 updated_at: None,
658 ended_at: None,
659 last_activity_at: None,
660 }
661 }
662
663 #[test]
664 fn session_step_tracking_round_trips_and_clears() {
665 let state = state_with(config_with_env_key(None));
666 state.insert_session(test_session("s1"));
667
668 // Fresh session: no step pointer.
669 assert_eq!(state.session_current_step("s1"), None);
670
671 // Set → read back.
672 state.set_session_current_step("s1", Some("collect"));
673 assert_eq!(
674 state.session_current_step("s1"),
675 Some("collect".to_string())
676 );
677
678 // Overwrite.
679 state.set_session_current_step("s1", Some("summary"));
680 assert_eq!(
681 state.session_current_step("s1"),
682 Some("summary".to_string())
683 );
684
685 // Clear.
686 state.set_session_current_step("s1", None);
687 assert_eq!(state.session_current_step("s1"), None);
688
689 // Unknown session is a no-op, not a panic.
690 state.set_session_current_step("missing", Some("x"));
691 assert_eq!(state.session_current_step("missing"), None);
692 }
693
694 #[test]
695 fn session_step_is_isolated_per_session() {
696 let state = state_with(config_with_env_key(None));
697 state.insert_session(test_session("s1"));
698 state.insert_session(test_session("s2"));
699 state.set_session_current_step("s1", Some("greet"));
700 assert_eq!(state.session_current_step("s1"), Some("greet".to_string()));
701 assert_eq!(state.session_current_step("s2"), None);
702 }
703
704 #[test]
705 fn session_authenticated_round_trips_and_defaults_false() {
706 let state = state_with(config_with_env_key(None));
707 state.insert_session(test_session("s1"));
708
709 // Fresh session: not verified.
710 assert!(!state.session_authenticated("s1"));
711 // Unknown session: not verified (no panic).
712 assert!(!state.session_authenticated("missing"));
713
714 state.set_session_authenticated("s1", true);
715 assert!(state.session_authenticated("s1"));
716
717 state.set_session_authenticated("s1", false);
718 assert!(!state.session_authenticated("s1"));
719
720 // Verified bit coexists with the workflow step pointer.
721 state.set_session_authenticated("s1", true);
722 state.set_session_current_step("s1", Some("collect"));
723 assert!(state.session_authenticated("s1"));
724 assert_eq!(
725 state.session_current_step("s1"),
726 Some("collect".to_string())
727 );
728 }
729
730 #[test]
731 fn session_contact_reads_stashed_email() {
732 let state = state_with(config_with_env_key(None));
733 let mut session = test_session("s1");
734 let mut meta = std::collections::HashMap::new();
735 meta.insert("contactEmail".to_string(), "a@example.com".into());
736 session.metadata = Some(meta);
737 state.insert_session(session);
738
739 let contact = state.session_contact("s1");
740 assert_eq!(contact.email.as_deref(), Some("a@example.com"));
741 assert_eq!(contact.phone, None);
742
743 // Unknown / contact-less sessions yield an empty contact.
744 assert!(state.session_contact("missing").is_empty());
745 state.insert_session(test_session("s2"));
746 assert!(state.session_contact("s2").is_empty());
747 }
748
749 /// Per-org resolver covering exactly one org; `None` (→ env fallback) for any
750 /// other org. Mirrors what a multi-tenant host installs.
751 struct OneOrgResolver {
752 org: String,
753 key: String,
754 }
755
756 #[async_trait]
757 impl GatewayKeyResolver for OneOrgResolver {
758 async fn resolve(&self, org_id: &str) -> Option<String> {
759 (org_id == self.org).then(|| self.key.clone())
760 }
761 }
762
763 #[tokio::test]
764 async fn default_state_resolves_env_key_for_every_org() {
765 // No resolver injected: the default `EnvGatewayKeyResolver` returns the
766 // single env key for every org — unchanged local behavior.
767 let state = state_with(config_with_env_key(Some("env-key")));
768 let env = state.config.gateway_key.as_deref();
769 assert_eq!(
770 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
771 Some("env-key".to_string())
772 );
773 assert_eq!(
774 resolve_gateway_key(&state.gateway_key_resolver, "org-z", env).await,
775 Some("env-key".to_string())
776 );
777 }
778
779 #[tokio::test]
780 async fn injected_resolver_overrides_per_org_and_falls_back_to_env() {
781 let config = config_with_env_key(Some("env-key"));
782 let state = state_with(config).with_gateway_key_resolver(Arc::new(OneOrgResolver {
783 org: "org-a".to_string(),
784 key: "org-a-key".to_string(),
785 }));
786 let env = state.config.gateway_key.as_deref();
787
788 // Covered org → its own key.
789 assert_eq!(
790 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
791 Some("org-a-key".to_string())
792 );
793 // Uncovered org → env fallback.
794 assert_eq!(
795 resolve_gateway_key(&state.gateway_key_resolver, "org-b", env).await,
796 Some("env-key".to_string())
797 );
798 }
799
800 #[tokio::test]
801 async fn no_env_key_and_no_resolver_match_resolves_to_none() {
802 // Env key absent + default resolver → no key (turn is unavailable). Same
803 // behavior as today's `llm_config()` returning `None`.
804 let state = state_with(config_with_env_key(None));
805 let env = state.config.gateway_key.as_deref();
806 assert_eq!(
807 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
808 None
809 );
810 }
811}