Skip to main content

smooth_operator_server/
state.rs

1//! Server + per-connection state.
2//!
3//! [`AppState`] is shared across every connection + every admin HTTP request
4//! (cloneable `Arc` handles): the storage adapter, the resolved
5//! [`ServerConfig`], the session registry, and — for the admin API (Phase 12) —
6//! the [`AuthVerifier`], an [`IndexingStore`], and the document-set registry.
7//!
8//! Sessions live in an in-memory map keyed by `sessionId` so `get_session` and
9//! reconnects work across connections (mirrors the protocol's "connection →
10//! session" / "session → connections" state model, simplified for the reference
11//! single-process server). On AWS this map would be DynamoDB; on k8s, Redis or
12//! Postgres.
13
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17use smooth_operator_core::HumanResponse;
18use tokio::sync::mpsc::UnboundedSender;
19
20use smooth_operator::adapter::StorageAdapter;
21use smooth_operator::agent_config::{AgentConfigResolver, StaticAgentConfigResolver};
22use smooth_operator::auth::{AuthVerifier, NoAuthVerifier};
23use smooth_operator::backplane::{Backplane, InMemoryBackplane};
24use smooth_operator::connector_config::{ConnectorConfigStore, InMemoryConnectorConfigStore};
25use smooth_operator::domain::Session;
26use smooth_operator::gateway_key::{EnvGatewayKeyResolver, GatewayKeyResolver};
27use smooth_operator::otp::{OtpContact, OtpService};
28use smooth_operator::settings::{InMemorySettingsStore, SettingsStore};
29use smooth_operator::tool_provider::ToolProvider;
30use smooth_operator::widget_auth::{PermissiveWidgetAuth, WidgetAuthProvider};
31use tokio_util::sync::CancellationToken;
32
33use smooth_operator_core::llm_provider::LlmProvider;
34use smooth_operator_ingestion::indexing::{InMemoryIndexingStore, IndexingStore};
35
36use crate::config::ServerConfig;
37
38/// Shared, cloneable application state handed to every WebSocket connection +
39/// every admin HTTP request.
40#[derive(Clone)]
41pub struct AppState {
42    /// The single storage seam (conversations / participants / messages /
43    /// sessions / checkpoints / knowledge).
44    pub storage: Arc<dyn StorageAdapter>,
45    /// Resolved server configuration (gateway, model, limits).
46    pub config: Arc<ServerConfig>,
47    /// The configured auth verifier (jwt / smoo / none). Used by the admin API's
48    /// `require_role` extractor to turn a bearer token into a `Principal`.
49    pub auth: Arc<dyn AuthVerifier>,
50    /// Indexing-run status store, surfaced by `GET /admin/indexing/runs`.
51    pub indexing: Arc<dyn IndexingStore>,
52    /// Connector-configuration store, CRUD'd by the admin write API
53    /// (`/admin/connectors`). Org-scoped; holds an `auth_ref` (secret name), not
54    /// the secret itself.
55    pub connector_configs: Arc<dyn ConnectorConfigStore>,
56    /// Per-org agent settings store, read/written by `/admin/settings`.
57    pub settings: Arc<dyn SettingsStore>,
58    /// **Host tool-injection seam.** When `Some`, the runner asks this provider
59    /// for EXTRA tools and merges them into every turn's `ToolRegistry`
60    /// alongside the built-ins. Defaults to `None` (built-ins only); a host
61    /// installs one via [`with_tools`](Self::with_tools) to contribute its own
62    /// per-org tool catalog without forking the runner.
63    pub tool_provider: Option<Arc<dyn ToolProvider>>,
64    /// Embeddable-widget auth hook: resolves an agent's origin-allowlist +
65    /// public-key policy for `<smooth-agent-chat>` connections. Defaults to
66    /// [`PermissiveWidgetAuth`] (no enforcement) until a host installs a real
67    /// provider via [`with_widget_auth`](Self::with_widget_auth).
68    pub widget_auth: Arc<dyn WidgetAuthProvider>,
69    /// **Per-agent behavior config hook.** Resolves an agent's `instructions`
70    /// (system prompt), `personality`, `greeting`, and `conversation_workflow`
71    /// from its `agent_id` so a public chat agent behaves as its owner configured
72    /// — not as the generic org-default persona. Defaults to
73    /// [`StaticAgentConfigResolver`](smooth_operator::agent_config::StaticAgentConfigResolver) (empty ⇒ no
74    /// per-agent config → the org default persona is used, unchanged); a host
75    /// installs a real provider (backed by the monorepo `agents` table) via
76    /// [`with_agent_config`](Self::with_agent_config).
77    pub agent_config: Arc<dyn AgentConfigResolver>,
78    /// Connection backplane: per-pod sink registry + cross-pod event delivery.
79    /// Defaults to [`InMemoryBackplane`] (single-process); a host installs a
80    /// Redis/NATS impl via [`with_backplane`](Self::with_backplane) to scale out
81    /// and to let non-AI publishers push realtime events to connected clients.
82    pub backplane: Arc<dyn Backplane>,
83    /// Test-only injected LLM surface. When `Some`, every `send_message` turn
84    /// runs the engine against this provider (a
85    /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient))
86    /// instead of building a live gateway client from `config` — exactly the
87    /// `ServerState(chat_client=mock)` seam the Python reference uses to drive the
88    /// scenario-parity corpus deterministically offline. **`None` in production**
89    /// (a live client is built from the gateway config), so the `/ws` path is
90    /// byte-for-byte unchanged for real deployments. Installed via
91    /// [`with_chat_provider`](Self::with_chat_provider).
92    pub chat_provider: Option<Arc<dyn LlmProvider>>,
93    /// Per-org LLM gateway-key resolver: maps a turn's `org_id` to the gateway
94    /// key it should bill/scope to. Defaults to [`EnvGatewayKeyResolver`] (the
95    /// single `SMOOAI_GATEWAY_KEY` for every org — unchanged local behavior); a
96    /// multi-tenant host installs a per-org resolver via
97    /// [`with_gateway_key_resolver`](Self::with_gateway_key_resolver) so each
98    /// tenant's usage is attributed to its own key. The per-turn LLM-config build
99    /// falls back to the env key whenever the resolver returns `None`.
100    pub gateway_key_resolver: Arc<dyn GatewayKeyResolver>,
101    /// **End-user OTP identity-verification seam.** When `Some`, a turn whose
102    /// auth gate refuses an `end_user` tool on an unverified session triggers the
103    /// OTP flow: the server emits `otp_verification_required`, calls
104    /// [`send_otp`](smooth_operator::otp::OtpService::send_otp), and emits
105    /// `otp_sent`; a later `verify_otp` action calls
106    /// [`verify_otp`](smooth_operator::otp::OtpService::verify_otp) and, on
107    /// success, marks the session authenticated. `None` (the default) keeps the
108    /// current fail-closed behavior — the `end_user` tool is refused and no OTP is
109    /// offered. Installed via [`with_otp_service`](Self::with_otp_service). The
110    /// reference server never holds a code; the host owns generation/expiry.
111    pub otp_service: Option<Arc<dyn OtpService>>,
112    /// Graceful-shutdown signal, shared across every per-connection clone of this
113    /// state. On SIGTERM/ctrl_c the serve loop cancels this token; each
114    /// connection's reader loop selects on [`CancellationToken::cancelled`] so it
115    /// finishes its in-flight turn, exits, and detaches from the [`Backplane`] —
116    /// no in-flight turn dropped, no stale registry entry left behind. A fresh
117    /// token from [`new`](Self::new) is never cancelled, so the `/ws` path and
118    /// tests are unaffected until a `run`/serve path wires the signal.
119    pub shutdown: CancellationToken,
120    /// Session registry: `sessionId` → session blob. Shared across connections.
121    sessions: Arc<RwLock<HashMap<String, Session>>>,
122    /// Document-set registry, **org-scoped**: `org_id` → (set name → document
123    /// count). The in-memory knowledge backend drops document metadata on
124    /// ingest, so the admin API reads document-set membership from this side
125    /// registry. Keyed by org so org A's document sets are never reported to an
126    /// org-B caller (cross-org leak fix — SMOODEV access-control hardening).
127    doc_sets: Arc<RwLock<HashMap<String, HashMap<String, usize>>>>,
128    /// Connector registry, **org-scoped**: `org_id` → set of connector names
129    /// whose indexing runs should be listed. Keyed by org so a same-named
130    /// connector in two orgs does not collide, and `GET /admin/indexing/runs`
131    /// only ever lists the caller's org's connectors.
132    connectors: Arc<RwLock<HashMap<String, Vec<String>>>>,
133    /// **Human-in-the-loop pending confirmations**: `sessionId` →
134    /// [`HumanResponse`] sender for a turn currently parked on a write-tool
135    /// confirmation. When an agent turn calls a tool that requires human
136    /// approval, the runner installs a `ConfirmationHook` (smooth-operator-core)
137    /// that parks the loop and registers its response sender here. A subsequent
138    /// `confirm_tool_action` frame looks the session up, takes the sender, and
139    /// feeds it [`HumanResponse::Approved`] / [`HumanResponse::Denied`] to resume
140    /// the parked turn (execute or reject the tool). Keyed by session so each
141    /// session has at most one outstanding confirmation; an empty map means no
142    /// turn is parked (the default, byte-for-byte unchanged from before HITL).
143    pending_confirmations: Arc<RwLock<HashMap<String, UnboundedSender<HumanResponse>>>>,
144    /// When `true`, the router mounts the embedded widget host page at `/` and
145    /// the widget bundle at `/chat-widget.iife.js`. Off by default (the
146    /// K8s/Lambda flavors never serve the widget); the local flavor opts in via
147    /// [`with_widget`](Self::with_widget).
148    pub serve_widget: bool,
149    /// The auth token injected into the served widget host page (same-origin), so
150    /// the embedded widget connects to this server's `/ws?token=…`. `None` ⇒ no
151    /// token injected (a no-auth local server).
152    pub widget_token: Option<String>,
153    /// **Strict auth.** When `true`, the `/ws` connect path **rejects** a
154    /// missing/invalid token (HTTP 401) instead of degrading to an anonymous
155    /// connection. Off by default (K8s/widget anonymous flows unchanged); a
156    /// single-tenant local/tailnet deployment opts in via
157    /// [`with_strict_auth`](Self::with_strict_auth) so a tokenless peer can't
158    /// drive the agent.
159    pub strict_auth: bool,
160    /// **Default agent persona / system prompt.** When `Some`, it is used as the
161    /// turn's system prompt whenever the per-org [`AgentSettings::persona`] is
162    /// `None` — i.e. a host-supplied default that replaces the built-in
163    /// customer-support [`KNOWLEDGE_CHAT_SYSTEM_PROMPT`](crate::runner) when no
164    /// per-org override exists. The single-tenant local daemon installs its
165    /// "Big Smooth" personal-assistant persona here via
166    /// [`with_default_persona`](Self::with_default_persona). `None` (the default)
167    /// keeps the const prompt, so the cloud flavor is byte-for-byte unchanged.
168    pub default_persona: Option<String>,
169    /// **Model-pricing cache** for `GET /admin/model-costs`. The gateway's
170    /// `/v1/model/info` pricing is stable, so it's fetched at most once per
171    /// process and reused for every subsequent request (the admin handler sets
172    /// this on the first successful fetch; a gateway error is NOT cached, so a
173    /// transient failure is retried on the next request). Shared across clones so
174    /// every connection/request sees the same cached map.
175    pub model_costs_cache: Arc<tokio::sync::OnceCell<serde_json::Value>>,
176}
177
178/// Namespace a connector name by org for the [`IndexingStore`] key, so two orgs
179/// with a same-named connector (`"docs"`) record + list **separate** runs. The
180/// `\u{1}` separator can't appear in a user-supplied connector name, so it can't
181/// be spoofed to cross an org boundary.
182#[must_use]
183pub fn scoped_connector_key(org_id: &str, connector_name: &str) -> String {
184    format!("IXCONN#{org_id}\u{1}{connector_name}")
185}
186
187impl AppState {
188    /// Construct shared state over a storage adapter and config.
189    ///
190    /// Defaults the admin-API collaborators: a [`NoAuthVerifier`] (overridden via
191    /// [`with_auth`](Self::with_auth)) and an empty [`InMemoryIndexingStore`]
192    /// (overridden via [`with_indexing`](Self::with_indexing)). The `/ws` path
193    /// uses none of these, so existing callers are unaffected.
194    #[must_use]
195    pub fn new(storage: Arc<dyn StorageAdapter>, config: ServerConfig) -> Self {
196        // Default resolver returns the single env gateway key for every org, so
197        // the local/default flavor is unchanged until a host installs a per-org
198        // resolver via `with_gateway_key_resolver`.
199        let gateway_key_resolver: Arc<dyn GatewayKeyResolver> =
200            Arc::new(EnvGatewayKeyResolver::new(config.gateway_key.clone()));
201        Self {
202            storage,
203            config: Arc::new(config),
204            auth: Arc::new(NoAuthVerifier::default()),
205            indexing: Arc::new(InMemoryIndexingStore::new()),
206            connector_configs: Arc::new(InMemoryConnectorConfigStore::new()),
207            settings: Arc::new(InMemorySettingsStore::new()),
208            tool_provider: None,
209            widget_auth: Arc::new(PermissiveWidgetAuth),
210            agent_config: Arc::new(StaticAgentConfigResolver::default()),
211            backplane: Arc::new(InMemoryBackplane::new()),
212            chat_provider: None,
213            gateway_key_resolver,
214            otp_service: None,
215            // A fresh, never-cancelled token: every clone of this state shares
216            // its cancellation state, so the serve loop cancelling once fans out
217            // to every connection. Defaulting here (rather than at each call
218            // site) keeps construction ripple-free.
219            shutdown: CancellationToken::new(),
220            sessions: Arc::new(RwLock::new(HashMap::new())),
221            doc_sets: Arc::new(RwLock::new(HashMap::new())),
222            connectors: Arc::new(RwLock::new(HashMap::new())),
223            pending_confirmations: Arc::new(RwLock::new(HashMap::new())),
224            serve_widget: false,
225            widget_token: None,
226            strict_auth: false,
227            default_persona: None,
228            model_costs_cache: Arc::new(tokio::sync::OnceCell::new()),
229        }
230    }
231
232    /// Install the configured auth verifier (builder).
233    #[must_use]
234    pub fn with_auth(mut self, auth: Arc<dyn AuthVerifier>) -> Self {
235        self.auth = auth;
236        self
237    }
238
239    /// Replace the storage adapter (builder).
240    ///
241    /// Lets an embedder (e.g. the local-flavor daemon) swap the default
242    /// in-memory store for a **durable local adapter** — the seam an always-on,
243    /// self-hosted deployment needs so conversations/sessions/checkpoints
244    /// survive a restart without standing up Postgres.
245    #[must_use]
246    pub fn with_storage(mut self, storage: Arc<dyn StorageAdapter>) -> Self {
247        self.storage = storage;
248        self
249    }
250
251    /// Install the indexing store (builder).
252    #[must_use]
253    pub fn with_indexing(mut self, indexing: Arc<dyn IndexingStore>) -> Self {
254        self.indexing = indexing;
255        self
256    }
257
258    /// Install the connector-configuration store (builder).
259    #[must_use]
260    pub fn with_connector_configs(mut self, store: Arc<dyn ConnectorConfigStore>) -> Self {
261        self.connector_configs = store;
262        self
263    }
264
265    /// Install the agent-settings store (builder).
266    #[must_use]
267    pub fn with_settings(mut self, store: Arc<dyn SettingsStore>) -> Self {
268        self.settings = store;
269        self
270    }
271
272    /// Install a host [`ToolProvider`] (builder). The runner merges the
273    /// provider's per-turn tools into every turn's registry alongside the
274    /// built-ins. Without this, the registry is exactly the built-ins, so the
275    /// default/local flavor is unaffected.
276    #[must_use]
277    pub fn with_tools(mut self, provider: Arc<dyn ToolProvider>) -> Self {
278        self.tool_provider = Some(provider);
279        self
280    }
281
282    /// Enable **strict auth** (builder): reject `/ws` connections with a
283    /// missing/invalid token (HTTP 401) instead of degrading to anonymous. Pair
284    /// with a real [`with_auth`](Self::with_auth) verifier. Off by default.
285    #[must_use]
286    pub fn with_strict_auth(mut self, strict: bool) -> Self {
287        self.strict_auth = strict;
288        self
289    }
290
291    /// Install a **default agent persona** (builder): the system prompt used for
292    /// a turn when the per-org [`AgentSettings::persona`] is unset. A single-tenant
293    /// host (the local daemon) installs its own personality here so every turn
294    /// runs as that agent rather than the built-in customer-support prompt. `None`
295    /// (the default) keeps the const prompt, so the cloud flavor is unchanged. An
296    /// empty/whitespace-only string is treated as no default.
297    #[must_use]
298    pub fn with_default_persona(mut self, persona: impl Into<String>) -> Self {
299        let persona = persona.into();
300        self.default_persona = if persona.trim().is_empty() {
301            None
302        } else {
303            Some(persona)
304        };
305        self
306    }
307
308    /// Serve the embedded official widget (host page at `/`, bundle at
309    /// `/chat-widget.iife.js`), injecting `token` into the page so the widget
310    /// connects to this server's `/ws?token=…` (builder). The local deployment
311    /// flavor opts in; other flavors never mount the widget routes.
312    #[must_use]
313    pub fn with_widget(mut self, token: Option<String>) -> Self {
314        self.serve_widget = true;
315        self.widget_token = token;
316        self
317    }
318
319    /// Install the embeddable-widget auth provider (builder). A host backs this
320    /// with its agent store so embed origins + public keys are enforced.
321    #[must_use]
322    pub fn with_widget_auth(mut self, provider: Arc<dyn WidgetAuthProvider>) -> Self {
323        self.widget_auth = provider;
324        self
325    }
326
327    /// Install the per-agent behavior-config provider (builder). A host backs
328    /// this with its `agents` store so each agent's `instructions` /
329    /// `conversation_workflow` drive its conversations. Without it, the runner
330    /// falls back to the org-default persona (unchanged behavior).
331    #[must_use]
332    pub fn with_agent_config(mut self, provider: Arc<dyn AgentConfigResolver>) -> Self {
333        self.agent_config = provider;
334        self
335    }
336
337    /// Install the connection backplane (builder). A host installs a Redis/NATS
338    /// impl to scale the WS service horizontally and to let other services push
339    /// realtime events to connected clients via [`Backplane::publish`].
340    #[must_use]
341    pub fn with_backplane(mut self, backplane: Arc<dyn Backplane>) -> Self {
342        self.backplane = backplane;
343        self
344    }
345
346    /// Install a test-injected LLM provider (builder). Every `send_message` turn
347    /// then runs the engine against this provider instead of a live gateway
348    /// client — the [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)
349    /// seam the scenario-parity corpus drives. Production never calls this, so the
350    /// live path is unchanged. See [`chat_provider`](Self::chat_provider).
351    #[must_use]
352    pub fn with_chat_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
353        self.chat_provider = Some(provider);
354        self
355    }
356
357    /// Install a per-org gateway-key resolver (builder). A multi-tenant host
358    /// installs a resolver backed by its per-org key store (e.g. one LiteLLM
359    /// virtual key per tenant) so each org's turns are billed/scoped to its own
360    /// key. The per-turn LLM-config build falls back to the env key whenever the
361    /// resolver returns `None`, so a resolver covering only some orgs is safe.
362    /// Leaving this unset keeps the default [`EnvGatewayKeyResolver`] (single env
363    /// key for every org — unchanged local behavior).
364    #[must_use]
365    pub fn with_gateway_key_resolver(mut self, resolver: Arc<dyn GatewayKeyResolver>) -> Self {
366        self.gateway_key_resolver = resolver;
367        self
368    }
369
370    /// Install the end-user OTP identity-verification service (builder). Wires the
371    /// `end_user` auth gate to an OTP flow (see [`otp_service`](Self::otp_service));
372    /// leaving it unset keeps the fail-closed default (refuse, no OTP offered).
373    #[must_use]
374    pub fn with_otp_service(mut self, service: Arc<dyn OtpService>) -> Self {
375        self.otp_service = Some(service);
376        self
377    }
378
379    /// Install the graceful-shutdown signal (builder). The serve loop owns a
380    /// clone of this token and cancels it on SIGTERM/ctrl_c; every per-connection
381    /// clone observes the cancellation and drains. Defaulted to a fresh token in
382    /// [`new`](Self::new), so this is only needed when a caller wants to drive
383    /// shutdown from its own token.
384    #[must_use]
385    pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
386        self.shutdown = shutdown;
387        self
388    }
389
390    /// Register a freshly created session.
391    pub fn insert_session(&self, session: Session) {
392        if let Ok(mut map) = self.sessions.write() {
393            map.insert(session.session_id.clone(), session);
394        }
395    }
396
397    /// Look up a session by id.
398    #[must_use]
399    pub fn get_session(&self, session_id: &str) -> Option<Session> {
400        self.sessions.read().ok()?.get(session_id).cloned()
401    }
402
403    /// The conversation-workflow step this session is currently on, read from the
404    /// session's `metadata.currentStepId`. `None` = no workflow / fresh start (the
405    /// runner then resolves to the workflow's first step).
406    #[must_use]
407    pub fn session_current_step(&self, session_id: &str) -> Option<String> {
408        self.sessions
409            .read()
410            .ok()?
411            .get(session_id)?
412            .metadata
413            .as_ref()?
414            .get("currentStepId")?
415            .as_str()
416            .map(str::to_string)
417    }
418
419    /// Persist the workflow step pointer onto the in-memory session's
420    /// `metadata.currentStepId`. Matches the session registry's durability (the
421    /// pointer lives as long as the session does, on the pod that owns it). A
422    /// `None` step clears the pointer. No-op for an unknown session.
423    pub fn set_session_current_step(&self, session_id: &str, step_id: Option<&str>) {
424        if let Ok(mut map) = self.sessions.write() {
425            if let Some(session) = map.get_mut(session_id) {
426                let mut meta = session.metadata.take().unwrap_or_default();
427                match step_id {
428                    Some(id) => {
429                        meta.insert("currentStepId".to_string(), serde_json::Value::from(id));
430                    }
431                    None => {
432                        meta.remove("currentStepId");
433                    }
434                }
435                session.metadata = Some(meta);
436            }
437        }
438    }
439
440    /// Whether this session's caller has completed OTP identity verification,
441    /// read from the session's `metadata.otpVerified`. `false` for an unknown or
442    /// unverified session. Threaded into the `end_user` auth gate so a verified
443    /// session's gated tools run. Same durability as the session registry (lives
444    /// as long as the session, on the pod that owns it).
445    #[must_use]
446    pub fn session_authenticated(&self, session_id: &str) -> bool {
447        self.sessions
448            .read()
449            .ok()
450            .and_then(|map| {
451                map.get(session_id)?
452                    .metadata
453                    .as_ref()?
454                    .get("otpVerified")?
455                    .as_bool()
456            })
457            .unwrap_or(false)
458    }
459
460    /// Mark this session identity-verified (or clear it) by setting
461    /// `metadata.otpVerified`. Called after a successful `verify_otp`. No-op for
462    /// an unknown session. Coexists with the workflow step pointer (both live in
463    /// the session's metadata map).
464    pub fn set_session_authenticated(&self, session_id: &str, verified: bool) {
465        if let Ok(mut map) = self.sessions.write() {
466            if let Some(session) = map.get_mut(session_id) {
467                let mut meta = session.metadata.take().unwrap_or_default();
468                meta.insert("otpVerified".to_string(), serde_json::Value::from(verified));
469                session.metadata = Some(meta);
470            }
471        }
472    }
473
474    /// The caller's OTP contact points for this session, read from the session's
475    /// `metadata.contactEmail` / `metadata.contactPhone` (stashed at
476    /// create-session time). Empty when the session is unknown or captured no
477    /// contact — the server then can't offer OTP. The reference create-session
478    /// path captures only an email.
479    #[must_use]
480    pub fn session_contact(&self, session_id: &str) -> OtpContact {
481        let Ok(map) = self.sessions.read() else {
482            return OtpContact::default();
483        };
484        let Some(meta) = map.get(session_id).and_then(|s| s.metadata.as_ref()) else {
485            return OtpContact::default();
486        };
487        OtpContact {
488            email: meta
489                .get("contactEmail")
490                .and_then(|v| v.as_str())
491                .map(str::to_string),
492            phone: meta
493                .get("contactPhone")
494                .and_then(|v| v.as_str())
495                .map(str::to_string),
496        }
497    }
498
499    /// Record that a document was added to a named document set **within an org**
500    /// (increments its count). Used by seeding + the ingest path so
501    /// `GET /admin/document-sets` can report set names + counts despite the
502    /// in-memory backend dropping document metadata. Org-scoped so org A's sets
503    /// are never reported to an org-B caller.
504    pub fn record_document_set(&self, org_id: impl Into<String>, set: impl Into<String>) {
505        if let Ok(mut map) = self.doc_sets.write() {
506            *map.entry(org_id.into())
507                .or_default()
508                .entry(set.into())
509                .or_insert(0) += 1;
510        }
511    }
512
513    /// Snapshot **one org's** document-set registry as `(name, count)` pairs,
514    /// sorted by name for a stable response. Never returns another org's sets.
515    #[must_use]
516    pub fn document_sets(&self, org_id: &str) -> Vec<(String, usize)> {
517        let Ok(map) = self.doc_sets.read() else {
518            return Vec::new();
519        };
520        let Some(org_sets) = map.get(org_id) else {
521            return Vec::new();
522        };
523        let mut out: Vec<(String, usize)> = org_sets.iter().map(|(k, v)| (k.clone(), *v)).collect();
524        out.sort_by(|a, b| a.0.cmp(&b.0));
525        out
526    }
527
528    /// Record a connector (within an org) whose indexing runs should be listed
529    /// (idempotent). Org-scoped so a same-named connector in two orgs records
530    /// separately and `GET /admin/indexing/runs` only lists the caller's org's.
531    pub fn record_connector(&self, org_id: impl Into<String>, name: impl Into<String>) {
532        let name = name.into();
533        if let Ok(mut map) = self.connectors.write() {
534            let v = map.entry(org_id.into()).or_default();
535            if !v.iter().any(|c| c == &name) {
536                v.push(name);
537            }
538        }
539    }
540
541    /// Snapshot **one org's** recorded connector names (sorted, stable). Never
542    /// returns another org's connectors.
543    #[must_use]
544    pub fn connectors(&self, org_id: &str) -> Vec<String> {
545        let Ok(map) = self.connectors.read() else {
546            return Vec::new();
547        };
548        let mut out = map.get(org_id).cloned().unwrap_or_default();
549        out.sort();
550        out
551    }
552
553    /// Register a parked turn's [`HumanResponse`] sender for `session_id`, so a
554    /// later `confirm_tool_action` can resume it. Any prior pending sender for
555    /// the same session is replaced (one outstanding confirmation per session).
556    /// Called by the runner's confirmation bridge when a write tool emits a
557    /// `HumanRequest::Confirm`.
558    pub fn register_confirmation(
559        &self,
560        session_id: impl Into<String>,
561        responder: UnboundedSender<HumanResponse>,
562    ) {
563        if let Ok(mut map) = self.pending_confirmations.write() {
564            map.insert(session_id.into(), responder);
565        }
566    }
567
568    /// Take (remove + return) the pending [`HumanResponse`] sender for
569    /// `session_id`, if a turn is parked on a confirmation. Returns `None` when
570    /// no turn awaits confirmation for that session (the common case). Taking it
571    /// out — rather than cloning — guarantees a single confirmation resolves a
572    /// single parked tool call, and a duplicate `confirm_tool_action` is a no-op.
573    #[must_use]
574    pub fn take_confirmation(&self, session_id: &str) -> Option<UnboundedSender<HumanResponse>> {
575        self.pending_confirmations.write().ok()?.remove(session_id)
576    }
577
578    /// Drop any pending confirmation registered for `session_id` without
579    /// resolving it. Called when a parked turn ends (the bridge task finishes)
580    /// so a stale sender can't linger and mis-route a later confirmation.
581    pub fn clear_confirmation(&self, session_id: &str) {
582        if let Ok(mut map) = self.pending_confirmations.write() {
583            map.remove(session_id);
584        }
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591    use async_trait::async_trait;
592    use smooth_operator::gateway_key::resolve_gateway_key;
593    use smooth_operator_adapter_memory::InMemoryStorageAdapter;
594
595    use crate::config::{ServerConfig, StorageBackend, DEFAULT_GATEWAY_URL, DEFAULT_MODEL};
596
597    /// Build a config with an explicit env gateway key for the resolver tests.
598    fn config_with_env_key(env_key: Option<&str>) -> ServerConfig {
599        ServerConfig {
600            bind: "127.0.0.1".to_string(),
601            port: 0,
602            gateway_url: DEFAULT_GATEWAY_URL.to_string(),
603            gateway_key: env_key.map(str::to_string),
604            model: DEFAULT_MODEL.to_string(),
605            seed_kb: false,
606            max_iterations: 6,
607            max_tokens: 512,
608            storage: StorageBackend::Memory,
609            widget_auth_strict: false,
610            confirm_tools: Vec::new(),
611            judge_model: "claude-haiku-4-5".to_string(),
612        }
613    }
614
615    fn state_with(config: ServerConfig) -> AppState {
616        AppState::new(Arc::new(InMemoryStorageAdapter::new()), config)
617    }
618
619    #[test]
620    fn default_persona_unset_by_default() {
621        let state = state_with(config_with_env_key(None));
622        assert_eq!(
623            state.default_persona, None,
624            "no default persona unless a host installs one"
625        );
626    }
627
628    #[test]
629    fn with_default_persona_installs_and_trims_empty() {
630        let state =
631            state_with(config_with_env_key(None)).with_default_persona("You are Big Smooth.");
632        assert_eq!(
633            state.default_persona.as_deref(),
634            Some("You are Big Smooth.")
635        );
636        // An empty / whitespace-only persona is treated as "no default".
637        let blank = state_with(config_with_env_key(None)).with_default_persona("   ");
638        assert_eq!(blank.default_persona, None, "blank persona is ignored");
639    }
640
641    /// Minimal session for the step-tracking tests.
642    fn test_session(session_id: &str) -> Session {
643        Session {
644            session_id: session_id.to_string(),
645            conversation_id: "conv".to_string(),
646            organization_id: "org".to_string(),
647            agent_id: "agent".to_string(),
648            agent_name: "Agent".to_string(),
649            user_participant_id: "u".to_string(),
650            agent_participant_id: "a".to_string(),
651            thread_id: "conv".to_string(),
652            status: Some(smooth_operator::domain::SessionStatus::Active),
653            token_count: Some(0),
654            message_count: Some(0),
655            metadata: None,
656            created_at: None,
657            updated_at: None,
658            ended_at: None,
659            last_activity_at: None,
660        }
661    }
662
663    #[test]
664    fn session_step_tracking_round_trips_and_clears() {
665        let state = state_with(config_with_env_key(None));
666        state.insert_session(test_session("s1"));
667
668        // Fresh session: no step pointer.
669        assert_eq!(state.session_current_step("s1"), None);
670
671        // Set → read back.
672        state.set_session_current_step("s1", Some("collect"));
673        assert_eq!(
674            state.session_current_step("s1"),
675            Some("collect".to_string())
676        );
677
678        // Overwrite.
679        state.set_session_current_step("s1", Some("summary"));
680        assert_eq!(
681            state.session_current_step("s1"),
682            Some("summary".to_string())
683        );
684
685        // Clear.
686        state.set_session_current_step("s1", None);
687        assert_eq!(state.session_current_step("s1"), None);
688
689        // Unknown session is a no-op, not a panic.
690        state.set_session_current_step("missing", Some("x"));
691        assert_eq!(state.session_current_step("missing"), None);
692    }
693
694    #[test]
695    fn session_step_is_isolated_per_session() {
696        let state = state_with(config_with_env_key(None));
697        state.insert_session(test_session("s1"));
698        state.insert_session(test_session("s2"));
699        state.set_session_current_step("s1", Some("greet"));
700        assert_eq!(state.session_current_step("s1"), Some("greet".to_string()));
701        assert_eq!(state.session_current_step("s2"), None);
702    }
703
704    #[test]
705    fn session_authenticated_round_trips_and_defaults_false() {
706        let state = state_with(config_with_env_key(None));
707        state.insert_session(test_session("s1"));
708
709        // Fresh session: not verified.
710        assert!(!state.session_authenticated("s1"));
711        // Unknown session: not verified (no panic).
712        assert!(!state.session_authenticated("missing"));
713
714        state.set_session_authenticated("s1", true);
715        assert!(state.session_authenticated("s1"));
716
717        state.set_session_authenticated("s1", false);
718        assert!(!state.session_authenticated("s1"));
719
720        // Verified bit coexists with the workflow step pointer.
721        state.set_session_authenticated("s1", true);
722        state.set_session_current_step("s1", Some("collect"));
723        assert!(state.session_authenticated("s1"));
724        assert_eq!(
725            state.session_current_step("s1"),
726            Some("collect".to_string())
727        );
728    }
729
730    #[test]
731    fn session_contact_reads_stashed_email() {
732        let state = state_with(config_with_env_key(None));
733        let mut session = test_session("s1");
734        let mut meta = std::collections::HashMap::new();
735        meta.insert("contactEmail".to_string(), "a@example.com".into());
736        session.metadata = Some(meta);
737        state.insert_session(session);
738
739        let contact = state.session_contact("s1");
740        assert_eq!(contact.email.as_deref(), Some("a@example.com"));
741        assert_eq!(contact.phone, None);
742
743        // Unknown / contact-less sessions yield an empty contact.
744        assert!(state.session_contact("missing").is_empty());
745        state.insert_session(test_session("s2"));
746        assert!(state.session_contact("s2").is_empty());
747    }
748
749    /// Per-org resolver covering exactly one org; `None` (→ env fallback) for any
750    /// other org. Mirrors what a multi-tenant host installs.
751    struct OneOrgResolver {
752        org: String,
753        key: String,
754    }
755
756    #[async_trait]
757    impl GatewayKeyResolver for OneOrgResolver {
758        async fn resolve(&self, org_id: &str) -> Option<String> {
759            (org_id == self.org).then(|| self.key.clone())
760        }
761    }
762
763    #[tokio::test]
764    async fn default_state_resolves_env_key_for_every_org() {
765        // No resolver injected: the default `EnvGatewayKeyResolver` returns the
766        // single env key for every org — unchanged local behavior.
767        let state = state_with(config_with_env_key(Some("env-key")));
768        let env = state.config.gateway_key.as_deref();
769        assert_eq!(
770            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
771            Some("env-key".to_string())
772        );
773        assert_eq!(
774            resolve_gateway_key(&state.gateway_key_resolver, "org-z", env).await,
775            Some("env-key".to_string())
776        );
777    }
778
779    #[tokio::test]
780    async fn injected_resolver_overrides_per_org_and_falls_back_to_env() {
781        let config = config_with_env_key(Some("env-key"));
782        let state = state_with(config).with_gateway_key_resolver(Arc::new(OneOrgResolver {
783            org: "org-a".to_string(),
784            key: "org-a-key".to_string(),
785        }));
786        let env = state.config.gateway_key.as_deref();
787
788        // Covered org → its own key.
789        assert_eq!(
790            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
791            Some("org-a-key".to_string())
792        );
793        // Uncovered org → env fallback.
794        assert_eq!(
795            resolve_gateway_key(&state.gateway_key_resolver, "org-b", env).await,
796            Some("env-key".to_string())
797        );
798    }
799
800    #[tokio::test]
801    async fn no_env_key_and_no_resolver_match_resolves_to_none() {
802        // Env key absent + default resolver → no key (turn is unavailable). Same
803        // behavior as today's `llm_config()` returning `None`.
804        let state = state_with(config_with_env_key(None));
805        let env = state.config.gateway_key.as_deref();
806        assert_eq!(
807            resolve_gateway_key(&state.gateway_key_resolver, "org-a", env).await,
808            None
809        );
810    }
811}