smooth_operator_server/state.rs
1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator_core::HumanResponse;
18use tokio::sync::mpsc::UnboundedSender;
19
20use smooth_operator::adapter::StorageAdapter;
21use smooth_operator::agent_config::{AgentConfigResolver, StaticAgentConfigResolver};
22use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
23use smooth_operator::backplane::{Backplane, InMemoryBackplane};
24use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
25use smooth_operator::domain::Session;
26use smooth_operator::gateway_key::{EnvGatewayKeyResolver, GatewayKeyResolver};
27use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
28use smooth_operator::tool_provider::ToolProvider;
29use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
30use tokio_util::sync::CancellationToken;
31
32use smooth_operator_core::llm_provider::LlmProvider;
33use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
34
35use crate::config::ServerConfig;
36
37/// Shared, cloneable application state handed to every WebSocket connection +
38/// every admin HTTP request.
39#[derive(Clone)]
40pub struct AppState {
41 /// The single storage seam (conversations / participants / messages /
42 /// sessions / checkpoints / knowledge).
43 pub storage: Arc<dyn StorageAdapter>,
44 /// Resolved server configuration (gateway, model, limits).
45 pub config: Arc<ServerConfig>,
46 /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
47 /// `require_role` extractor to turn a bearer token into a `Principal`.
48 pub auth: Arc<dyn AuthVerifier>,
49 /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
50 pub indexing: Arc<dyn IndexingStore>,
51 /// Connector-configuration store, CRUD'd by the admin write API
52 /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
53 /// the secret itself.
54 pub connector_configs: Arc<dyn ConnectorConfigStore>,
55 /// Per-org agent settings store, read/written by `/admin/settings`.
56 pub settings: Arc<dyn SettingsStore>,
57 /// **Host tool-injection seam.** When `Some`, the runner asks this provider
58 /// for EXTRA tools and merges them into every turn's `ToolRegistry`
59 /// alongside the built-ins. Defaults to `None` (built-ins only); a host
60 /// installs one via [`with_tools`](Self::with_tools) to contribute its own
61 /// per-org tool catalog without forking the runner.
62 pub tool_provider: Option<Arc<dyn ToolProvider>>,
63 /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
64 /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
65 /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
66 /// provider via [`with_widget_auth`](Self::with_widget_auth).
67 pub widget_auth: Arc<dyn WidgetAuthProvider>,
68 /// **Per-agent behavior config hook.** Resolves an agent's `instructions`
69 /// (system prompt), `personality`, `greeting`, and `conversation_workflow`
70 /// from its `agent_id` so a public chat agent behaves as its owner configured
71 /// — not as the generic org-default persona. Defaults to
72 /// [`StaticAgentConfigResolver`](smooth_operator::agent_config::StaticAgentConfigResolver) (empty ⇒ no
73 /// per-agent config → the org default persona is used, unchanged); a host
74 /// installs a real provider (backed by the monorepo `agents` table) via
75 /// [`with_agent_config`](Self::with_agent_config).
76 pub agent_config: Arc<dyn AgentConfigResolver>,
77 /// Connection backplane: per-pod sink registry + cross-pod event delivery.
78 /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
79 /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
80 /// and to let non-AI publishers push realtime events to connected clients.
81 pub backplane: Arc<dyn Backplane>,
82 /// Test-only injected LLM surface. When `Some`, every `send_message` turn
83 /// runs the engine against this provider (a
84 /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient))
85 /// instead of building a live gateway client from `config` — exactly the
86 /// `ServerState(chat_client=mock)` seam the Python reference uses to drive the
87 /// scenario-parity corpus deterministically offline. **`None` in production**
88 /// (a live client is built from the gateway config), so the `/ws` path is
89 /// byte-for-byte unchanged for real deployments. Installed via
90 /// [`with_chat_provider`](Self::with_chat_provider).
91 pub chat_provider: Option<Arc<dyn LlmProvider>>,
92 /// Per-org LLM gateway-key resolver: maps a turn's `org_id` to the gateway
93 /// key it should bill/scope to. Defaults to [`EnvGatewayKeyResolver`] (the
94 /// single `SMOOAI_GATEWAY_KEY` for every org — unchanged local behavior); a
95 /// multi-tenant host installs a per-org resolver via
96 /// [`with_gateway_key_resolver`](Self::with_gateway_key_resolver) so each
97 /// tenant's usage is attributed to its own key. The per-turn LLM-config build
98 /// falls back to the env key whenever the resolver returns `None`.
99 pub gateway_key_resolver: Arc<dyn GatewayKeyResolver>,
100 /// Graceful-shutdown signal, shared across every per-connection clone of this
101 /// state. On SIGTERM/ctrl_c the serve loop cancels this token; each
102 /// connection's reader loop selects on [`CancellationToken::cancelled`] so it
103 /// finishes its in-flight turn, exits, and detaches from the [`Backplane`] —
104 /// no in-flight turn dropped, no stale registry entry left behind. A fresh
105 /// token from [`new`](Self::new) is never cancelled, so the `/ws` path and
106 /// tests are unaffected until a `run`/serve path wires the signal.
107 pub shutdown: CancellationToken,
108 /// Session registry: `sessionId` → session blob. Shared across connections.
109 sessions: Arc<RwLock<HashMap<String, Session>>>,
110 /// Document-set registry, **org-scoped**: `org_id` → (set name → document
111 /// count). The in-memory knowledge backend drops document metadata on
112 /// ingest, so the admin API reads document-set membership from this side
113 /// registry. Keyed by org so org A's document sets are never reported to an
114 /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
115 doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
116 /// Connector registry, **org-scoped**: `org_id` → set of connector names
117 /// whose indexing runs should be listed. Keyed by org so a same-named
118 /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
119 /// only ever lists the caller's org's connectors.
120 connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
121 /// **Human-in-the-loop pending confirmations**: `sessionId` →
122 /// [`HumanResponse`] sender for a turn currently parked on a write-tool
123 /// confirmation. When an agent turn calls a tool that requires human
124 /// approval, the runner installs a `ConfirmationHook` (smooth-operator-core)
125 /// that parks the loop and registers its response sender here. A subsequent
126 /// `confirm_tool_action` frame looks the session up, takes the sender, and
127 /// feeds it [`HumanResponse::Approved`] / [`HumanResponse::Denied`] to resume
128 /// the parked turn (execute or reject the tool). Keyed by session so each
129 /// session has at most one outstanding confirmation; an empty map means no
130 /// turn is parked (the default, byte-for-byte unchanged from before HITL).
131 pending_confirmations: Arc<RwLock<HashMap<String, UnboundedSender<HumanResponse>>>>,
132 /// When `true`, the router mounts the embedded widget host page at `/` and
133 /// the widget bundle at `/chat-widget.iife.js`. Off by default (the
134 /// K8s/Lambda flavors never serve the widget); the local flavor opts in via
135 /// [`with_widget`](Self::with_widget).
136 pub serve_widget: bool,
137 /// The auth token injected into the served widget host page (same-origin), so
138 /// the embedded widget connects to this server's `/ws?token=…`. `None` ⇒ no
139 /// token injected (a no-auth local server).
140 pub widget_token: Option<String>,
141 /// **Strict auth.** When `true`, the `/ws` connect path **rejects** a
142 /// missing/invalid token (HTTP 401) instead of degrading to an anonymous
143 /// connection. Off by default (K8s/widget anonymous flows unchanged); a
144 /// single-tenant local/tailnet deployment opts in via
145 /// [`with_strict_auth`](Self::with_strict_auth) so a tokenless peer can't
146 /// drive the agent.
147 pub strict_auth: bool,
148 /// **Default agent persona / system prompt.** When `Some`, it is used as the
149 /// turn's system prompt whenever the per-org [`AgentSettings::persona`] is
150 /// `None` — i.e. a host-supplied default that replaces the built-in
151 /// customer-support [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`](crate::runner) when no
152 /// per-org override exists. The single-tenant local daemon installs its
153 /// "Big Smooth" personal-assistant persona here via
154 /// [`with_default_persona`](Self::with_default_persona). `None` (the default)
155 /// keeps the const prompt, so the cloud flavor is byte-for-byte unchanged.
156 pub default_persona: Option<String>,
157 /// **Model-pricing cache** for `GET /admin/model-costs`. The gateway's
158 /// `/v1/model/info` pricing is stable, so it's fetched at most once per
159 /// process and reused for every subsequent request (the admin handler sets
160 /// this on the first successful fetch; a gateway error is NOT cached, so a
161 /// transient failure is retried on the next request). Shared across clones so
162 /// every connection/request sees the same cached map.
163 pub model_costs_cache: Arc<tokio::sync::OnceCell<serde_json::Value>>,
164}
165
166/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
167/// with a same-named connector (`"docs"`) record + list **separate** runs. The
168/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
169/// be spoofed to cross an org boundary.
170#[must_use]
171pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
172 format!("IXCONN#{org_id}\u{1}{connector_name}")
173}
174
175impl AppState {
176 /// Construct shared state over a storage adapter and config.
177 ///
178 /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
179 /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
180 /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
181 /// uses none of these, so existing callers are unaffected.
182 #[must_use]
183 pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
184 // Default resolver returns the single env gateway key for every org, so
185 // the local/default flavor is unchanged until a host installs a per-org
186 // resolver via `with_gateway_key_resolver`.
187 let gateway_key_resolver: Arc<dyn GatewayKeyResolver> =
188 Arc::new(EnvGatewayKeyResolver::new(config.gateway_key.clone()));
189 Self {
190 storage,
191 config: Arc::new(config),
192 auth: Arc::new(NoAuthVerifier::default()),
193 indexing: Arc::new(InMemoryIndexingStore::new()),
194 connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
195 settings: Arc::new(InMemorySettingsStore::new()),
196 tool_provider: None,
197 widget_auth: Arc::new(PermissiveWidgetAuth),
198 agent_config: Arc::new(StaticAgentConfigResolver::default()),
199 backplane: Arc::new(InMemoryBackplane::new()),
200 chat_provider: None,
201 gateway_key_resolver,
202 // A fresh, never-cancelled token: every clone of this state shares
203 // its cancellation state, so the serve loop cancelling once fans out
204 // to every connection. Defaulting here (rather than at each call
205 // site) keeps construction ripple-free.
206 shutdown: CancellationToken::new(),
207 sessions: Arc::new(RwLock::new(HashMap::new())),
208 doc_sets: Arc::new(RwLock::new(HashMap::new())),
209 connectors: Arc::new(RwLock::new(HashMap::new())),
210 pending_confirmations: Arc::new(RwLock::new(HashMap::new())),
211 serve_widget: false,
212 widget_token: None,
213 strict_auth: false,
214 default_persona: None,
215 model_costs_cache: Arc::new(tokio::sync::OnceCell::new()),
216 }
217 }
218
219 /// Install the configured auth verifier (builder).
220 #[must_use]
221 pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
222 self.auth = auth;
223 self
224 }
225
226 /// Replace the storage adapter (builder).
227 ///
228 /// Lets an embedder (e.g. the local-flavor daemon) swap the default
229 /// in-memory store for a **durable local adapter** — the seam an always-on,
230 /// self-hosted deployment needs so conversations/sessions/checkpoints
231 /// survive a restart without standing up Postgres.
232 #[must_use]
233 pub fn with_storage(mut self, storage: Arc<dyn StorageAdapter>) -> Self {
234 self.storage = storage;
235 self
236 }
237
238 /// Install the indexing store (builder).
239 #[must_use]
240 pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
241 self.indexing = indexing;
242 self
243 }
244
245 /// Install the connector-configuration store (builder).
246 #[must_use]
247 pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
248 self.connector_configs = store;
249 self
250 }
251
252 /// Install the agent-settings store (builder).
253 #[must_use]
254 pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
255 self.settings = store;
256 self
257 }
258
259 /// Install a host [`ToolProvider`] (builder). The runner merges the
260 /// provider's per-turn tools into every turn's registry alongside the
261 /// built-ins. Without this, the registry is exactly the built-ins, so the
262 /// default/local flavor is unaffected.
263 #[must_use]
264 pub fn with_tools(mut self, provider: Arc<dyn ToolProvider>) -> Self {
265 self.tool_provider = Some(provider);
266 self
267 }
268
269 /// Enable **strict auth** (builder): reject `/ws` connections with a
270 /// missing/invalid token (HTTP 401) instead of degrading to anonymous. Pair
271 /// with a real [`with_auth`](Self::with_auth) verifier. Off by default.
272 #[must_use]
273 pub fn with_strict_auth(mut self, strict: bool) -> Self {
274 self.strict_auth = strict;
275 self
276 }
277
278 /// Install a **default agent persona** (builder): the system prompt used for
279 /// a turn when the per-org [`AgentSettings::persona`] is unset. A single-tenant
280 /// host (the local daemon) installs its own personality here so every turn
281 /// runs as that agent rather than the built-in customer-support prompt. `None`
282 /// (the default) keeps the const prompt, so the cloud flavor is unchanged. An
283 /// empty/whitespace-only string is treated as no default.
284 #[must_use]
285 pub fn with_default_persona(mut self, persona: impl Into<String>) -> Self {
286 let persona = persona.into();
287 self.default_persona = if persona.trim().is_empty() {
288 None
289 } else {
290 Some(persona)
291 };
292 self
293 }
294
295 /// Serve the embedded official widget (host page at `/`, bundle at
296 /// `/chat-widget.iife.js`), injecting `token` into the page so the widget
297 /// connects to this server's `/ws?token=…` (builder). The local deployment
298 /// flavor opts in; other flavors never mount the widget routes.
299 #[must_use]
300 pub fn with_widget(mut self, token: Option<String>) -> Self {
301 self.serve_widget = true;
302 self.widget_token = token;
303 self
304 }
305
306 /// Install the embeddable-widget auth provider (builder). A host backs this
307 /// with its agent store so embed origins + public keys are enforced.
308 #[must_use]
309 pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
310 self.widget_auth = provider;
311 self
312 }
313
314 /// Install the per-agent behavior-config provider (builder). A host backs
315 /// this with its `agents` store so each agent's `instructions` /
316 /// `conversation_workflow` drive its conversations. Without it, the runner
317 /// falls back to the org-default persona (unchanged behavior).
318 #[must_use]
319 pub fn with_agent_config(mut self, provider: Arc<dyn AgentConfigResolver>) -> Self {
320 self.agent_config = provider;
321 self
322 }
323
324 /// Install the connection backplane (builder). A host installs a Redis/NATS
325 /// impl to scale the WS service horizontally and to let other services push
326 /// realtime events to connected clients via [`Backplane::publish`].
327 #[must_use]
328 pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
329 self.backplane = backplane;
330 self
331 }
332
333 /// Install a test-injected LLM provider (builder). Every `send_message` turn
334 /// then runs the engine against this provider instead of a live gateway
335 /// client — the [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)
336 /// seam the scenario-parity corpus drives. Production never calls this, so the
337 /// live path is unchanged. See [`chat_provider`](Self::chat_provider).
338 #[must_use]
339 pub fn with_chat_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
340 self.chat_provider = Some(provider);
341 self
342 }
343
344 /// Install a per-org gateway-key resolver (builder). A multi-tenant host
345 /// installs a resolver backed by its per-org key store (e.g. one LiteLLM
346 /// virtual key per tenant) so each org's turns are billed/scoped to its own
347 /// key. The per-turn LLM-config build falls back to the env key whenever the
348 /// resolver returns `None`, so a resolver covering only some orgs is safe.
349 /// Leaving this unset keeps the default [`EnvGatewayKeyResolver`] (single env
350 /// key for every org — unchanged local behavior).
351 #[must_use]
352 pub fn with_gateway_key_resolver(mut self, resolver: Arc<dyn GatewayKeyResolver>) -> Self {
353 self.gateway_key_resolver = resolver;
354 self
355 }
356
357 /// Install the graceful-shutdown signal (builder). The serve loop owns a
358 /// clone of this token and cancels it on SIGTERM/ctrl_c; every per-connection
359 /// clone observes the cancellation and drains. Defaulted to a fresh token in
360 /// [`new`](Self::new), so this is only needed when a caller wants to drive
361 /// shutdown from its own token.
362 #[must_use]
363 pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
364 self.shutdown = shutdown;
365 self
366 }
367
368 /// Register a freshly created session.
369 pub fn insert_session(&self, session: Session) {
370 if let Ok(mut map) = self.sessions.write() {
371 map.insert(session.session_id.clone(), session);
372 }
373 }
374
375 /// Look up a session by id.
376 #[must_use]
377 pub fn get_session(&self, session_id: &str) -> Option<Session> {
378 self.sessions.read().ok()?.get(session_id).cloned()
379 }
380
381 /// The conversation-workflow step this session is currently on, read from the
382 /// session's `metadata.currentStepId`. `None` = no workflow / fresh start (the
383 /// runner then resolves to the workflow's first step).
384 #[must_use]
385 pub fn session_current_step(&self, session_id: &str) -> Option<String> {
386 self.sessions
387 .read()
388 .ok()?
389 .get(session_id)?
390 .metadata
391 .as_ref()?
392 .get("currentStepId")?
393 .as_str()
394 .map(str::to_string)
395 }
396
397 /// Persist the workflow step pointer onto the in-memory session's
398 /// `metadata.currentStepId`. Matches the session registry's durability (the
399 /// pointer lives as long as the session does, on the pod that owns it). A
400 /// `None` step clears the pointer. No-op for an unknown session.
401 pub fn set_session_current_step(&self, session_id: &str, step_id: Option<&str>) {
402 if let Ok(mut map) = self.sessions.write() {
403 if let Some(session) = map.get_mut(session_id) {
404 let mut meta = session.metadata.take().unwrap_or_default();
405 match step_id {
406 Some(id) => {
407 meta.insert("currentStepId".to_string(), serde_json::Value::from(id));
408 }
409 None => {
410 meta.remove("currentStepId");
411 }
412 }
413 session.metadata = Some(meta);
414 }
415 }
416 }
417
418 /// Record that a document was added to a named document set **within an org**
419 /// (increments its count). Used by seeding + the ingest path so
420 /// `GET /admin/document-sets` can report set names + counts despite the
421 /// in-memory backend dropping document metadata. Org-scoped so org A's sets
422 /// are never reported to an org-B caller.
423 pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
424 if let Ok(mut map) = self.doc_sets.write() {
425 *map.entry(org_id.into())
426 .or_default()
427 .entry(set.into())
428 .or_insert(0) += 1;
429 }
430 }
431
432 /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
433 /// sorted by name for a stable response. Never returns another org's sets.
434 #[must_use]
435 pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
436 let Ok(map) = self.doc_sets.read() else {
437 return Vec::new();
438 };
439 let Some(org_sets) = map.get(org_id) else {
440 return Vec::new();
441 };
442 let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
443 out.sort_by(|a, b| a.0.cmp(&b.0));
444 out
445 }
446
447 /// Record a connector (within an org) whose indexing runs should be listed
448 /// (idempotent). Org-scoped so a same-named connector in two orgs records
449 /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
450 pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
451 let name = name.into();
452 if let Ok(mut map) = self.connectors.write() {
453 let v = map.entry(org_id.into()).or_default();
454 if !v.iter().any(|c| c == &name) {
455 v.push(name);
456 }
457 }
458 }
459
460 /// Snapshot **one org's** recorded connector names (sorted, stable). Never
461 /// returns another org's connectors.
462 #[must_use]
463 pub fn connectors(&self, org_id: &str) -> Vec<String> {
464 let Ok(map) = self.connectors.read() else {
465 return Vec::new();
466 };
467 let mut out = map.get(org_id).cloned().unwrap_or_default();
468 out.sort();
469 out
470 }
471
472 /// Register a parked turn's [`HumanResponse`] sender for `session_id`, so a
473 /// later `confirm_tool_action` can resume it. Any prior pending sender for
474 /// the same session is replaced (one outstanding confirmation per session).
475 /// Called by the runner's confirmation bridge when a write tool emits a
476 /// `HumanRequest::Confirm`.
477 pub fn register_confirmation(
478 &self,
479 session_id: impl Into<String>,
480 responder: UnboundedSender<HumanResponse>,
481 ) {
482 if let Ok(mut map) = self.pending_confirmations.write() {
483 map.insert(session_id.into(), responder);
484 }
485 }
486
487 /// Take (remove + return) the pending [`HumanResponse`] sender for
488 /// `session_id`, if a turn is parked on a confirmation. Returns `None` when
489 /// no turn awaits confirmation for that session (the common case). Taking it
490 /// out — rather than cloning — guarantees a single confirmation resolves a
491 /// single parked tool call, and a duplicate `confirm_tool_action` is a no-op.
492 #[must_use]
493 pub fn take_confirmation(&self, session_id: &str) -> Option<UnboundedSender<HumanResponse>> {
494 self.pending_confirmations.write().ok()?.remove(session_id)
495 }
496
497 /// Drop any pending confirmation registered for `session_id` without
498 /// resolving it. Called when a parked turn ends (the bridge task finishes)
499 /// so a stale sender can't linger and mis-route a later confirmation.
500 pub fn clear_confirmation(&self, session_id: &str) {
501 if let Ok(mut map) = self.pending_confirmations.write() {
502 map.remove(session_id);
503 }
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use async_trait::async_trait;
511 use smooth_operator::gateway_key::resolve_gateway_key;
512 use smooth_operator_adapter_memory::InMemoryStorageAdapter;
513
514 use crate::config::{ServerConfig, StorageBackend, DEFAULT_GATEWAY_URL, DEFAULT_MODEL};
515
516 /// Build a config with an explicit env gateway key for the resolver tests.
517 fn config_with_env_key(env_key: Option<&str>) -> ServerConfig {
518 ServerConfig {
519 bind: "127.0.0.1".to_string(),
520 port: 0,
521 gateway_url: DEFAULT_GATEWAY_URL.to_string(),
522 gateway_key: env_key.map(str::to_string),
523 model: DEFAULT_MODEL.to_string(),
524 seed_kb: false,
525 max_iterations: 6,
526 max_tokens: 512,
527 storage: StorageBackend::Memory,
528 widget_auth_strict: false,
529 confirm_tools: Vec::new(),
530 judge_model: "claude-haiku-4-5".to_string(),
531 }
532 }
533
534 fn state_with(config: ServerConfig) -> AppState {
535 AppState::new(Arc::new(InMemoryStorageAdapter::new()), config)
536 }
537
538 #[test]
539 fn default_persona_unset_by_default() {
540 let state = state_with(config_with_env_key(None));
541 assert_eq!(
542 state.default_persona, None,
543 "no default persona unless a host installs one"
544 );
545 }
546
547 #[test]
548 fn with_default_persona_installs_and_trims_empty() {
549 let state =
550 state_with(config_with_env_key(None)).with_default_persona("You are Big Smooth.");
551 assert_eq!(
552 state.default_persona.as_deref(),
553 Some("You are Big Smooth.")
554 );
555 // An empty / whitespace-only persona is treated as "no default".
556 let blank = state_with(config_with_env_key(None)).with_default_persona(" ");
557 assert_eq!(blank.default_persona, None, "blank persona is ignored");
558 }
559
560 /// Minimal session for the step-tracking tests.
561 fn test_session(session_id: &str) -> Session {
562 Session {
563 session_id: session_id.to_string(),
564 conversation_id: "conv".to_string(),
565 organization_id: "org".to_string(),
566 agent_id: "agent".to_string(),
567 agent_name: "Agent".to_string(),
568 user_participant_id: "u".to_string(),
569 agent_participant_id: "a".to_string(),
570 thread_id: "conv".to_string(),
571 status: Some(smooth_operator::domain::SessionStatus::Active),
572 token_count: Some(0),
573 message_count: Some(0),
574 metadata: None,
575 created_at: None,
576 updated_at: None,
577 ended_at: None,
578 last_activity_at: None,
579 }
580 }
581
582 #[test]
583 fn session_step_tracking_round_trips_and_clears() {
584 let state = state_with(config_with_env_key(None));
585 state.insert_session(test_session("s1"));
586
587 // Fresh session: no step pointer.
588 assert_eq!(state.session_current_step("s1"), None);
589
590 // Set → read back.
591 state.set_session_current_step("s1", Some("collect"));
592 assert_eq!(
593 state.session_current_step("s1"),
594 Some("collect".to_string())
595 );
596
597 // Overwrite.
598 state.set_session_current_step("s1", Some("summary"));
599 assert_eq!(
600 state.session_current_step("s1"),
601 Some("summary".to_string())
602 );
603
604 // Clear.
605 state.set_session_current_step("s1", None);
606 assert_eq!(state.session_current_step("s1"), None);
607
608 // Unknown session is a no-op, not a panic.
609 state.set_session_current_step("missing", Some("x"));
610 assert_eq!(state.session_current_step("missing"), None);
611 }
612
613 #[test]
614 fn session_step_is_isolated_per_session() {
615 let state = state_with(config_with_env_key(None));
616 state.insert_session(test_session("s1"));
617 state.insert_session(test_session("s2"));
618 state.set_session_current_step("s1", Some("greet"));
619 assert_eq!(state.session_current_step("s1"), Some("greet".to_string()));
620 assert_eq!(state.session_current_step("s2"), None);
621 }
622
623 /// Per-org resolver covering exactly one org; `None` (→ env fallback) for any
624 /// other org. Mirrors what a multi-tenant host installs.
625 struct OneOrgResolver {
626 org: String,
627 key: String,
628 }
629
630 #[async_trait]
631 impl GatewayKeyResolver for OneOrgResolver {
632 async fn resolve(&self, org_id: &str) -> Option<String> {
633 (org_id == self.org).then(|| self.key.clone())
634 }
635 }
636
637 #[tokio::test]
638 async fn default_state_resolves_env_key_for_every_org() {
639 // No resolver injected: the default `EnvGatewayKeyResolver` returns the
640 // single env key for every org — unchanged local behavior.
641 let state = state_with(config_with_env_key(Some("env-key")));
642 let env = state.config.gateway_key.as_deref();
643 assert_eq!(
644 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
645 Some("env-key".to_string())
646 );
647 assert_eq!(
648 resolve_gateway_key(&state.gateway_key_resolver, "org-z", env).await,
649 Some("env-key".to_string())
650 );
651 }
652
653 #[tokio::test]
654 async fn injected_resolver_overrides_per_org_and_falls_back_to_env() {
655 let config = config_with_env_key(Some("env-key"));
656 let state = state_with(config).with_gateway_key_resolver(Arc::new(OneOrgResolver {
657 org: "org-a".to_string(),
658 key: "org-a-key".to_string(),
659 }));
660 let env = state.config.gateway_key.as_deref();
661
662 // Covered org → its own key.
663 assert_eq!(
664 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
665 Some("org-a-key".to_string())
666 );
667 // Uncovered org → env fallback.
668 assert_eq!(
669 resolve_gateway_key(&state.gateway_key_resolver, "org-b", env).await,
670 Some("env-key".to_string())
671 );
672 }
673
674 #[tokio::test]
675 async fn no_env_key_and_no_resolver_match_resolves_to_none() {
676 // Env key absent + default resolver → no key (turn is unavailable). Same
677 // behavior as today's `llm_config()` returning `None`.
678 let state = state_with(config_with_env_key(None));
679 let env = state.config.gateway_key.as_deref();
680 assert_eq!(
681 resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
682 None
683 );
684 }
685}