Skip to main content

smooth_operator_server/
state.rs

1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator_core::HumanResponse;
18use tokio::sync::mpsc::UnboundedSender;
19
20use smooth_operator::adapter::StorageAdapter;
21use smooth_operator::agent_config::{AgentConfigResolver, StaticAgentConfigResolver};
22use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
23use smooth_operator::backplane::{Backplane, InMemoryBackplane};
24use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
25use smooth_operator::domain::Session;
26use smooth_operator::gateway_key::{EnvGatewayKeyResolver, GatewayKeyResolver};
27use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
28use smooth_operator::tool_provider::ToolProvider;
29use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
30use tokio_util::sync::CancellationToken;
31
32use smooth_operator_core::llm_provider::LlmProvider;
33use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
34
35use crate::config::ServerConfig;
36
37/// Shared, cloneable application state handed to every WebSocket connection +
38/// every admin HTTP request.
39#[derive(Clone)]
40pub struct AppState {
41    /// The single storage seam (conversations / participants / messages /
42    /// sessions / checkpoints / knowledge).
43    pub storage: Arc<dyn StorageAdapter>,
44    /// Resolved server configuration (gateway, model, limits).
45    pub config: Arc<ServerConfig>,
46    /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
47    /// `require_role` extractor to turn a bearer token into a `Principal`.
48    pub auth: Arc<dyn AuthVerifier>,
49    /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
50    pub indexing: Arc<dyn IndexingStore>,
51    /// Connector-configuration store, CRUD'd by the admin write API
52    /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
53    /// the secret itself.
54    pub connector_configs: Arc<dyn ConnectorConfigStore>,
55    /// Per-org agent settings store, read/written by `/admin/settings`.
56    pub settings: Arc<dyn SettingsStore>,
57    /// **Host tool-injection seam.** When `Some`, the runner asks this provider
58    /// for EXTRA tools and merges them into every turn's `ToolRegistry`
59    /// alongside the built-ins. Defaults to `None` (built-ins only); a host
60    /// installs one via [`with_tools`](Self::with_tools) to contribute its own
61    /// per-org tool catalog without forking the runner.
62    pub tool_provider: Option<Arc<dyn ToolProvider>>,
63    /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
64    /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
65    /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
66    /// provider via [`with_widget_auth`](Self::with_widget_auth).
67    pub widget_auth: Arc<dyn WidgetAuthProvider>,
68    /// **Per-agent behavior config hook.** Resolves an agent's `instructions`
69    /// (system prompt), `personality`, `greeting`, and `conversation_workflow`
70    /// from its `agent_id` so a public chat agent behaves as its owner configured
71    /// — not as the generic org-default persona. Defaults to
72    /// [`StaticAgentConfigResolver`](smooth_operator::agent_config::StaticAgentConfigResolver) (empty ⇒ no
73    /// per-agent config → the org default persona is used, unchanged); a host
74    /// installs a real provider (backed by the monorepo `agents` table) via
75    /// [`with_agent_config`](Self::with_agent_config).
76    pub agent_config: Arc<dyn AgentConfigResolver>,
77    /// Connection backplane: per-pod sink registry + cross-pod event delivery.
78    /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
79    /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
80    /// and to let non-AI publishers push realtime events to connected clients.
81    pub backplane: Arc<dyn Backplane>,
82    /// Test-only injected LLM surface. When `Some`, every `send_message` turn
83    /// runs the engine against this provider (a
84    /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient))
85    /// instead of building a live gateway client from `config` — exactly the
86    /// `ServerState(chat_client=mock)` seam the Python reference uses to drive the
87    /// scenario-parity corpus deterministically offline. **`None` in production**
88    /// (a live client is built from the gateway config), so the `/ws` path is
89    /// byte-for-byte unchanged for real deployments. Installed via
90    /// [`with_chat_provider`](Self::with_chat_provider).
91    pub chat_provider: Option<Arc<dyn LlmProvider>>,
92    /// Per-org LLM gateway-key resolver: maps a turn's `org_id` to the gateway
93    /// key it should bill/scope to. Defaults to [`EnvGatewayKeyResolver`] (the
94    /// single `SMOOAI_GATEWAY_KEY` for every org — unchanged local behavior); a
95    /// multi-tenant host installs a per-org resolver via
96    /// [`with_gateway_key_resolver`](Self::with_gateway_key_resolver) so each
97    /// tenant's usage is attributed to its own key. The per-turn LLM-config build
98    /// falls back to the env key whenever the resolver returns `None`.
99    pub gateway_key_resolver: Arc<dyn GatewayKeyResolver>,
100    /// Graceful-shutdown signal, shared across every per-connection clone of this
101    /// state. On SIGTERM/ctrl_c the serve loop cancels this token; each
102    /// connection's reader loop selects on [`CancellationToken::cancelled`] so it
103    /// finishes its in-flight turn, exits, and detaches from the [`Backplane`] —
104    /// no in-flight turn dropped, no stale registry entry left behind. A fresh
105    /// token from [`new`](Self::new) is never cancelled, so the `/ws` path and
106    /// tests are unaffected until a `run`/serve path wires the signal.
107    pub shutdown: CancellationToken,
108    /// Session registry: `sessionId` → session blob. Shared across connections.
109    sessions: Arc<RwLock<HashMap<String, Session>>>,
110    /// Document-set registry, **org-scoped**: `org_id` → (set name → document
111    /// count). The in-memory knowledge backend drops document metadata on
112    /// ingest, so the admin API reads document-set membership from this side
113    /// registry. Keyed by org so org A's document sets are never reported to an
114    /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
115    doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
116    /// Connector registry, **org-scoped**: `org_id` → set of connector names
117    /// whose indexing runs should be listed. Keyed by org so a same-named
118    /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
119    /// only ever lists the caller's org's connectors.
120    connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
121    /// **Human-in-the-loop pending confirmations**: `sessionId` →
122    /// [`HumanResponse`] sender for a turn currently parked on a write-tool
123    /// confirmation. When an agent turn calls a tool that requires human
124    /// approval, the runner installs a `ConfirmationHook` (smooth-operator-core)
125    /// that parks the loop and registers its response sender here. A subsequent
126    /// `confirm_tool_action` frame looks the session up, takes the sender, and
127    /// feeds it [`HumanResponse::Approved`] / [`HumanResponse::Denied`] to resume
128    /// the parked turn (execute or reject the tool). Keyed by session so each
129    /// session has at most one outstanding confirmation; an empty map means no
130    /// turn is parked (the default, byte-for-byte unchanged from before HITL).
131    pending_confirmations: Arc<RwLock<HashMap<String, UnboundedSender<HumanResponse>>>>,
132    /// When `true`, the router mounts the embedded widget host page at `/` and
133    /// the widget bundle at `/chat-widget.iife.js`. Off by default (the
134    /// K8s/Lambda flavors never serve the widget); the local flavor opts in via
135    /// [`with_widget`](Self::with_widget).
136    pub serve_widget: bool,
137    /// The auth token injected into the served widget host page (same-origin), so
138    /// the embedded widget connects to this server's `/ws?token=…`. `None` ⇒ no
139    /// token injected (a no-auth local server).
140    pub widget_token: Option<String>,
141    /// **Strict auth.** When `true`, the `/ws` connect path **rejects** a
142    /// missing/invalid token (HTTP 401) instead of degrading to an anonymous
143    /// connection. Off by default (K8s/widget anonymous flows unchanged); a
144    /// single-tenant local/tailnet deployment opts in via
145    /// [`with_strict_auth`](Self::with_strict_auth) so a tokenless peer can't
146    /// drive the agent.
147    pub strict_auth: bool,
148    /// **Default agent persona / system prompt.** When `Some`, it is used as the
149    /// turn's system prompt whenever the per-org [`AgentSettings::persona`] is
150    /// `None` — i.e. a host-supplied default that replaces the built-in
151    /// customer-support [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`](crate::runner) when no
152    /// per-org override exists. The single-tenant local daemon installs its
153    /// "Big Smooth" personal-assistant persona here via
154    /// [`with_default_persona`](Self::with_default_persona). `None` (the default)
155    /// keeps the const prompt, so the cloud flavor is byte-for-byte unchanged.
156    pub default_persona: Option<String>,
157    /// **Model-pricing cache** for `GET /admin/model-costs`. The gateway's
158    /// `/v1/model/info` pricing is stable, so it's fetched at most once per
159    /// process and reused for every subsequent request (the admin handler sets
160    /// this on the first successful fetch; a gateway error is NOT cached, so a
161    /// transient failure is retried on the next request). Shared across clones so
162    /// every connection/request sees the same cached map.
163    pub model_costs_cache: Arc<tokio::sync::OnceCell<serde_json::Value>>,
164}
165
166/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
167/// with a same-named connector (`"docs"`) record + list **separate** runs. The
168/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
169/// be spoofed to cross an org boundary.
170#[must_use]
171pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
172    format!("IXCONN#{org_id}\u{1}{connector_name}")
173}
174
175impl AppState {
176    /// Construct shared state over a storage adapter and config.
177    ///
178    /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
179    /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
180    /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
181    /// uses none of these, so existing callers are unaffected.
182    #[must_use]
183    pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
184        // Default resolver returns the single env gateway key for every org, so
185        // the local/default flavor is unchanged until a host installs a per-org
186        // resolver via `with_gateway_key_resolver`.
187        let gateway_key_resolver: Arc<dyn GatewayKeyResolver> =
188            Arc::new(EnvGatewayKeyResolver::new(config.gateway_key.clone()));
189        Self {
190            storage,
191            config: Arc::new(config),
192            auth: Arc::new(NoAuthVerifier::default()),
193            indexing: Arc::new(InMemoryIndexingStore::new()),
194            connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
195            settings: Arc::new(InMemorySettingsStore::new()),
196            tool_provider: None,
197            widget_auth: Arc::new(PermissiveWidgetAuth),
198            agent_config: Arc::new(StaticAgentConfigResolver::default()),
199            backplane: Arc::new(InMemoryBackplane::new()),
200            chat_provider: None,
201            gateway_key_resolver,
202            // A fresh, never-cancelled token: every clone of this state shares
203            // its cancellation state, so the serve loop cancelling once fans out
204            // to every connection. Defaulting here (rather than at each call
205            // site) keeps construction ripple-free.
206            shutdown: CancellationToken::new(),
207            sessions: Arc::new(RwLock::new(HashMap::new())),
208            doc_sets: Arc::new(RwLock::new(HashMap::new())),
209            connectors: Arc::new(RwLock::new(HashMap::new())),
210            pending_confirmations: Arc::new(RwLock::new(HashMap::new())),
211            serve_widget: false,
212            widget_token: None,
213            strict_auth: false,
214            default_persona: None,
215            model_costs_cache: Arc::new(tokio::sync::OnceCell::new()),
216        }
217    }
218
219    /// Install the configured auth verifier (builder).
220    #[must_use]
221    pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
222        self.auth = auth;
223        self
224    }
225
226    /// Replace the storage adapter (builder).
227    ///
228    /// Lets an embedder (e.g. the local-flavor daemon) swap the default
229    /// in-memory store for a **durable local adapter** — the seam an always-on,
230    /// self-hosted deployment needs so conversations/sessions/checkpoints
231    /// survive a restart without standing up Postgres.
232    #[must_use]
233    pub fn with_storage(mut self, storage: Arc<dyn StorageAdapter>) -> Self {
234        self.storage = storage;
235        self
236    }
237
238    /// Install the indexing store (builder).
239    #[must_use]
240    pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
241        self.indexing = indexing;
242        self
243    }
244
245    /// Install the connector-configuration store (builder).
246    #[must_use]
247    pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
248        self.connector_configs = store;
249        self
250    }
251
252    /// Install the agent-settings store (builder).
253    #[must_use]
254    pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
255        self.settings = store;
256        self
257    }
258
259    /// Install a host [`ToolProvider`] (builder). The runner merges the
260    /// provider's per-turn tools into every turn's registry alongside the
261    /// built-ins. Without this, the registry is exactly the built-ins, so the
262    /// default/local flavor is unaffected.
263    #[must_use]
264    pub fn with_tools(mut self, provider: Arc<dyn ToolProvider>) -> Self {
265        self.tool_provider = Some(provider);
266        self
267    }
268
269    /// Enable **strict auth** (builder): reject `/ws` connections with a
270    /// missing/invalid token (HTTP 401) instead of degrading to anonymous. Pair
271    /// with a real [`with_auth`](Self::with_auth) verifier. Off by default.
272    #[must_use]
273    pub fn with_strict_auth(mut self, strict: bool) -> Self {
274        self.strict_auth = strict;
275        self
276    }
277
278    /// Install a **default agent persona** (builder): the system prompt used for
279    /// a turn when the per-org [`AgentSettings::persona`] is unset. A single-tenant
280    /// host (the local daemon) installs its own personality here so every turn
281    /// runs as that agent rather than the built-in customer-support prompt. `None`
282    /// (the default) keeps the const prompt, so the cloud flavor is unchanged. An
283    /// empty/whitespace-only string is treated as no default.
284    #[must_use]
285    pub fn with_default_persona(mut self, persona: impl Into<String>) -> Self {
286        let persona = persona.into();
287        self.default_persona = if persona.trim().is_empty() {
288            None
289        } else {
290            Some(persona)
291        };
292        self
293    }
294
295    /// Serve the embedded official widget (host page at `/`, bundle at
296    /// `/chat-widget.iife.js`), injecting `token` into the page so the widget
297    /// connects to this server's `/ws?token=…` (builder). The local deployment
298    /// flavor opts in; other flavors never mount the widget routes.
299    #[must_use]
300    pub fn with_widget(mut self, token: Option<String>) -> Self {
301        self.serve_widget = true;
302        self.widget_token = token;
303        self
304    }
305
306    /// Install the embeddable-widget auth provider (builder). A host backs this
307    /// with its agent store so embed origins + public keys are enforced.
308    #[must_use]
309    pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
310        self.widget_auth = provider;
311        self
312    }
313
314    /// Install the per-agent behavior-config provider (builder). A host backs
315    /// this with its `agents` store so each agent's `instructions` /
316    /// `conversation_workflow` drive its conversations. Without it, the runner
317    /// falls back to the org-default persona (unchanged behavior).
318    #[must_use]
319    pub fn with_agent_config(mut self, provider: Arc<dyn AgentConfigResolver>) -> Self {
320        self.agent_config = provider;
321        self
322    }
323
324    /// Install the connection backplane (builder). A host installs a Redis/NATS
325    /// impl to scale the WS service horizontally and to let other services push
326    /// realtime events to connected clients via [`Backplane::publish`].
327    #[must_use]
328    pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
329        self.backplane = backplane;
330        self
331    }
332
333    /// Install a test-injected LLM provider (builder). Every `send_message` turn
334    /// then runs the engine against this provider instead of a live gateway
335    /// client — the [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)
336    /// seam the scenario-parity corpus drives. Production never calls this, so the
337    /// live path is unchanged. See [`chat_provider`](Self::chat_provider).
338    #[must_use]
339    pub fn with_chat_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
340        self.chat_provider = Some(provider);
341        self
342    }
343
344    /// Install a per-org gateway-key resolver (builder). A multi-tenant host
345    /// installs a resolver backed by its per-org key store (e.g. one LiteLLM
346    /// virtual key per tenant) so each org's turns are billed/scoped to its own
347    /// key. The per-turn LLM-config build falls back to the env key whenever the
348    /// resolver returns `None`, so a resolver covering only some orgs is safe.
349    /// Leaving this unset keeps the default [`EnvGatewayKeyResolver`] (single env
350    /// key for every org — unchanged local behavior).
351    #[must_use]
352    pub fn with_gateway_key_resolver(mut self, resolver: Arc<dyn GatewayKeyResolver>) -> Self {
353        self.gateway_key_resolver = resolver;
354        self
355    }
356
357    /// Install the graceful-shutdown signal (builder). The serve loop owns a
358    /// clone of this token and cancels it on SIGTERM/ctrl_c; every per-connection
359    /// clone observes the cancellation and drains. Defaulted to a fresh token in
360    /// [`new`](Self::new), so this is only needed when a caller wants to drive
361    /// shutdown from its own token.
362    #[must_use]
363    pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
364        self.shutdown = shutdown;
365        self
366    }
367
368    /// Register a freshly created session.
369    pub fn insert_session(&self, session: Session) {
370        if let Ok(mut map) = self.sessions.write() {
371            map.insert(session.session_id.clone(), session);
372        }
373    }
374
375    /// Look up a session by id.
376    #[must_use]
377    pub fn get_session(&self, session_id: &str) -> Option<Session> {
378        self.sessions.read().ok()?.get(session_id).cloned()
379    }
380
381    /// The conversation-workflow step this session is currently on, read from the
382    /// session's `metadata.currentStepId`. `None` = no workflow / fresh start (the
383    /// runner then resolves to the workflow's first step).
384    #[must_use]
385    pub fn session_current_step(&self, session_id: &str) -> Option<String> {
386        self.sessions
387            .read()
388            .ok()?
389            .get(session_id)?
390            .metadata
391            .as_ref()?
392            .get("currentStepId")?
393            .as_str()
394            .map(str::to_string)
395    }
396
397    /// Persist the workflow step pointer onto the in-memory session's
398    /// `metadata.currentStepId`. Matches the session registry's durability (the
399    /// pointer lives as long as the session does, on the pod that owns it). A
400    /// `None` step clears the pointer. No-op for an unknown session.
401    pub fn set_session_current_step(&self, session_id: &str, step_id: Option<&str>) {
402        if let Ok(mut map) = self.sessions.write() {
403            if let Some(session) = map.get_mut(session_id) {
404                let mut meta = session.metadata.take().unwrap_or_default();
405                match step_id {
406                    Some(id) => {
407                        meta.insert("currentStepId".to_string(), serde_json::Value::from(id));
408                    }
409                    None => {
410                        meta.remove("currentStepId");
411                    }
412                }
413                session.metadata = Some(meta);
414            }
415        }
416    }
417
418    /// Record that a document was added to a named document set **within an org**
419    /// (increments its count). Used by seeding + the ingest path so
420    /// `GET /admin/document-sets` can report set names + counts despite the
421    /// in-memory backend dropping document metadata. Org-scoped so org A's sets
422    /// are never reported to an org-B caller.
423    pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
424        if let Ok(mut map) = self.doc_sets.write() {
425            *map.entry(org_id.into())
426                .or_default()
427                .entry(set.into())
428                .or_insert(0) += 1;
429        }
430    }
431
432    /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
433    /// sorted by name for a stable response. Never returns another org's sets.
434    #[must_use]
435    pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
436        let Ok(map) = self.doc_sets.read() else {
437            return Vec::new();
438        };
439        let Some(org_sets) = map.get(org_id) else {
440            return Vec::new();
441        };
442        let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
443        out.sort_by(|a, b| a.0.cmp(&b.0));
444        out
445    }
446
447    /// Record a connector (within an org) whose indexing runs should be listed
448    /// (idempotent). Org-scoped so a same-named connector in two orgs records
449    /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
450    pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
451        let name = name.into();
452        if let Ok(mut map) = self.connectors.write() {
453            let v = map.entry(org_id.into()).or_default();
454            if !v.iter().any(|c| c == &name) {
455                v.push(name);
456            }
457        }
458    }
459
460    /// Snapshot **one org's** recorded connector names (sorted, stable). Never
461    /// returns another org's connectors.
462    #[must_use]
463    pub fn connectors(&self, org_id: &str) -> Vec<String> {
464        let Ok(map) = self.connectors.read() else {
465            return Vec::new();
466        };
467        let mut out = map.get(org_id).cloned().unwrap_or_default();
468        out.sort();
469        out
470    }
471
472    /// Register a parked turn's [`HumanResponse`] sender for `session_id`, so a
473    /// later `confirm_tool_action` can resume it. Any prior pending sender for
474    /// the same session is replaced (one outstanding confirmation per session).
475    /// Called by the runner's confirmation bridge when a write tool emits a
476    /// `HumanRequest::Confirm`.
477    pub fn register_confirmation(
478        &self,
479        session_id: impl Into<String>,
480        responder: UnboundedSender<HumanResponse>,
481    ) {
482        if let Ok(mut map) = self.pending_confirmations.write() {
483            map.insert(session_id.into(), responder);
484        }
485    }
486
487    /// Take (remove + return) the pending [`HumanResponse`] sender for
488    /// `session_id`, if a turn is parked on a confirmation. Returns `None` when
489    /// no turn awaits confirmation for that session (the common case). Taking it
490    /// out — rather than cloning — guarantees a single confirmation resolves a
491    /// single parked tool call, and a duplicate `confirm_tool_action` is a no-op.
492    #[must_use]
493    pub fn take_confirmation(&self, session_id: &str) -> Option<UnboundedSender<HumanResponse>> {
494        self.pending_confirmations.write().ok()?.remove(session_id)
495    }
496
497    /// Drop any pending confirmation registered for `session_id` without
498    /// resolving it. Called when a parked turn ends (the bridge task finishes)
499    /// so a stale sender can't linger and mis-route a later confirmation.
500    pub fn clear_confirmation(&self, session_id: &str) {
501        if let Ok(mut map) = self.pending_confirmations.write() {
502            map.remove(session_id);
503        }
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use async_trait::async_trait;
511    use smooth_operator::gateway_key::resolve_gateway_key;
512    use smooth_operator_adapter_memory::InMemoryStorageAdapter;
513
514    use crate::config::{ServerConfig, StorageBackend, DEFAULT_GATEWAY_URL, DEFAULT_MODEL};
515
516    /// Build a config with an explicit env gateway key for the resolver tests.
517    fn config_with_env_key(env_key: Option<&str>) -> ServerConfig {
518        ServerConfig {
519            bind: "127.0.0.1".to_string(),
520            port: 0,
521            gateway_url: DEFAULT_GATEWAY_URL.to_string(),
522            gateway_key: env_key.map(str::to_string),
523            model: DEFAULT_MODEL.to_string(),
524            seed_kb: false,
525            max_iterations: 6,
526            max_tokens: 512,
527            storage: StorageBackend::Memory,
528            widget_auth_strict: false,
529            confirm_tools: Vec::new(),
530            judge_model: "claude-haiku-4-5".to_string(),
531        }
532    }
533
534    fn state_with(config: ServerConfig) -> AppState {
535        AppState::new(Arc::new(InMemoryStorageAdapter::new()), config)
536    }
537
538    #[test]
539    fn default_persona_unset_by_default() {
540        let state = state_with(config_with_env_key(None));
541        assert_eq!(
542            state.default_persona, None,
543            "no default persona unless a host installs one"
544        );
545    }
546
547    #[test]
548    fn with_default_persona_installs_and_trims_empty() {
549        let state =
550            state_with(config_with_env_key(None)).with_default_persona("You are Big Smooth.");
551        assert_eq!(
552            state.default_persona.as_deref(),
553            Some("You are Big Smooth.")
554        );
555        // An empty / whitespace-only persona is treated as "no default".
556        let blank = state_with(config_with_env_key(None)).with_default_persona("   ");
557        assert_eq!(blank.default_persona, None, "blank persona is ignored");
558    }
559
560    /// Minimal session for the step-tracking tests.
561    fn test_session(session_id: &str) -> Session {
562        Session {
563            session_id: session_id.to_string(),
564            conversation_id: "conv".to_string(),
565            organization_id: "org".to_string(),
566            agent_id: "agent".to_string(),
567            agent_name: "Agent".to_string(),
568            user_participant_id: "u".to_string(),
569            agent_participant_id: "a".to_string(),
570            thread_id: "conv".to_string(),
571            status: Some(smooth_operator::domain::SessionStatus::Active),
572            token_count: Some(0),
573            message_count: Some(0),
574            metadata: None,
575            created_at: None,
576            updated_at: None,
577            ended_at: None,
578            last_activity_at: None,
579        }
580    }
581
582    #[test]
583    fn session_step_tracking_round_trips_and_clears() {
584        let state = state_with(config_with_env_key(None));
585        state.insert_session(test_session("s1"));
586
587        // Fresh session: no step pointer.
588        assert_eq!(state.session_current_step("s1"), None);
589
590        // Set → read back.
591        state.set_session_current_step("s1", Some("collect"));
592        assert_eq!(
593            state.session_current_step("s1"),
594            Some("collect".to_string())
595        );
596
597        // Overwrite.
598        state.set_session_current_step("s1", Some("summary"));
599        assert_eq!(
600            state.session_current_step("s1"),
601            Some("summary".to_string())
602        );
603
604        // Clear.
605        state.set_session_current_step("s1", None);
606        assert_eq!(state.session_current_step("s1"), None);
607
608        // Unknown session is a no-op, not a panic.
609        state.set_session_current_step("missing", Some("x"));
610        assert_eq!(state.session_current_step("missing"), None);
611    }
612
613    #[test]
614    fn session_step_is_isolated_per_session() {
615        let state = state_with(config_with_env_key(None));
616        state.insert_session(test_session("s1"));
617        state.insert_session(test_session("s2"));
618        state.set_session_current_step("s1", Some("greet"));
619        assert_eq!(state.session_current_step("s1"), Some("greet".to_string()));
620        assert_eq!(state.session_current_step("s2"), None);
621    }
622
623    /// Per-org resolver covering exactly one org; `None` (→ env fallback) for any
624    /// other org. Mirrors what a multi-tenant host installs.
625    struct OneOrgResolver {
626        org: String,
627        key: String,
628    }
629
630    #[async_trait]
631    impl GatewayKeyResolver for OneOrgResolver {
632        async fn resolve(&self, org_id: &str) -> Option<String> {
633            (org_id == self.org).then(|| self.key.clone())
634        }
635    }
636
637    #[tokio::test]
638    async fn default_state_resolves_env_key_for_every_org() {
639        // No resolver injected: the default `EnvGatewayKeyResolver` returns the
640        // single env key for every org — unchanged local behavior.
641        let state = state_with(config_with_env_key(Some("env-key")));
642        let env = state.config.gateway_key.as_deref();
643        assert_eq!(
644            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
645            Some("env-key".to_string())
646        );
647        assert_eq!(
648            resolve_gateway_key(&state.gateway_key_resolver, "org-z", env).await,
649            Some("env-key".to_string())
650        );
651    }
652
653    #[tokio::test]
654    async fn injected_resolver_overrides_per_org_and_falls_back_to_env() {
655        let config = config_with_env_key(Some("env-key"));
656        let state = state_with(config).with_gateway_key_resolver(Arc::new(OneOrgResolver {
657            org: "org-a".to_string(),
658            key: "org-a-key".to_string(),
659        }));
660        let env = state.config.gateway_key.as_deref();
661
662        // Covered org → its own key.
663        assert_eq!(
664            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
665            Some("org-a-key".to_string())
666        );
667        // Uncovered org → env fallback.
668        assert_eq!(
669            resolve_gateway_key(&state.gateway_key_resolver, "org-b", env).await,
670            Some("env-key".to_string())
671        );
672    }
673
674    #[tokio::test]
675    async fn no_env_key_and_no_resolver_match_resolves_to_none() {
676        // Env key absent + default resolver → no key (turn is unavailable). Same
677        // behavior as today's `llm_config()` returning `None`.
678        let state = state_with(config_with_env_key(None));
679        let env = state.config.gateway_key.as_deref();
680        assert_eq!(
681            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
682            None
683        );
684    }
685}