smooai-smooth-operator-server 1.2.0

Reference WebSocket service for smooth-operator — speaks the schema-driven protocol over a smooth-operator KnowledgeChatRuntime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
//! The streaming, memory-carrying agent runner used by the WS service.
//!
//! `smooth-operator`'s [`KnowledgeChatRuntime`] proves the engine ↔ gateway
//! path but (a) is non-streaming (`run_turn` returns only after the turn
//! completes) and (b) has no cross-turn memory — each `run_turn` builds a fresh
//! [`Agent`] with a random id and no prior messages, so turn 2 forgets turn 1
//! (documented in `core/tests/e2e_llm_smoo_ai.rs`).
//!
//! The service needs both, so this module builds the agent itself, wiring the
//! same knowledge-grounding as core PLUS:
//!
//! 1. **Streaming** via [`Agent::run_with_channel`], translating the engine's
//!    [`AgentEvent`] stream into protocol events (`stream_token`,
//!    `stream_chunk`, `eventual_response`).
//! 2. **Per-session memory** via [`AgentConfig::with_prior_messages`]: before
//!    each turn the session's persisted message log is loaded from the storage
//!    adapter and replayed into the conversation, so the model sees turn 1 when
//!    answering turn 2. (`Agent::new` randomizes the agent id every time, so the
//!    checkpoint-resume path can't be keyed stably — replaying the persisted log
//!    is the robust, backend-agnostic way to carry memory. The log is the source
//!    of truth the adapter already persists.)

use std::sync::{Arc, Mutex};

use anyhow::Result;
use serde_json::json;
use smooth_operator_core::llm_provider::LlmProvider;
use smooth_operator_core::{
    Agent, AgentConfig, AgentEvent, KnowledgeBase, KnowledgeResult, LlmConfig,
    Message as EngineMessage, Role, ToolRegistry,
};
use tokio::sync::mpsc::UnboundedSender;

use smooth_operator::access_control::AccessContext;
use smooth_operator::adapter::{MessageQuery, StorageAdapter};
use smooth_operator::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
use smooth_operator::rerank::Reranker;
use smooth_operator::tools::{KnowledgeResultSink, KnowledgeSearchTool};
use smooth_operator::MAX_CITATIONS;

/// How many auto-injected knowledge results the engine prepends as
/// `[Relevant knowledge]` context. Mirrors smooth-operator-core's `Agent`
/// auto-injection (a top-3 query) so the citations we collect match the sources
/// that grounded the first LLM call.
const AUTO_CONTEXT_LIMIT: usize = 3;

/// System prompt for the knowledge-chat agent. Mirrors core's prompt: ground
/// answers in the knowledge base and search it before answering anything
/// organization-specific.
const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
    "You are a helpful customer-support agent for the organization. \
    Answer the user's question accurately and concisely. When a question depends on \
    organization-specific facts (policies, products, documentation), call the \
    `knowledge_search` tool to retrieve them before answering, and ground your answer \
    in what you retrieve. If the knowledge base has no relevant information, say so. \
    Remember facts the user tells you within the conversation and use them when asked.";

/// Max prior turns to replay into the conversation for memory. Bounds context
/// growth on long sessions; the in-memory log is small but a real backend could
/// be large.
const MAX_PRIOR_MESSAGES: usize = 50;

/// The terminal outcome of a streamed turn.
pub struct TurnResult {
    /// The agent's final natural-language reply.
    pub reply: String,
    /// The id of the persisted outbound (agent) message, for `eventual_response`.
    pub message_id: String,
    /// True if any `knowledge_search` tool call ran this turn (diagnostics).
    pub invoked_knowledge_search: bool,
    /// The sources that grounded this turn (the auto-injected context + every
    /// `knowledge_search` result), deduped by id and capped. Carried onto the
    /// `eventual_response`'s `citations`. Empty when nothing was retrieved.
    pub citations: Vec<Citation>,
}

/// Everything one streaming turn needs. Bundled into a struct so the call sites
/// (the reference server's `handle_send_message` and the lambda's
/// `send_message`) stay readable and the security-critical [`access`](Self::access)
/// field can never be silently dropped from a positional argument list.
pub struct TurnRequest<'a> {
    /// The storage seam (conversations / messages / sessions / knowledge).
    pub storage: Arc<dyn StorageAdapter>,
    /// The resolved LLM config for this turn.
    pub llm: LlmConfig,
    /// Agent-loop iteration cap.
    pub max_iterations: u32,
    /// The conversation this turn belongs to.
    pub conversation_id: &'a str,
    /// The protocol request id (streaming correlation).
    pub request_id: &'a str,
    /// The inbound user message.
    pub user_message: &'a str,
    /// **The requester's document-level entitlements.** Retrieval (the
    /// auto-injected `[Relevant knowledge]` context AND the `knowledge_search`
    /// tool) reads through `storage.knowledge_for_access(&access)`, so a
    /// restricted document is never surfaced to a requester who lacks the
    /// entitlement. An [`AccessContext::anonymous`] sees only org-public docs
    /// (fail closed for ACL'd content).
    pub access: AccessContext,
    /// Optional test-injected LLM surface (a `MockLlmClient`) so the turn runs
    /// deterministically offline. `None` in production (a live client is built
    /// from `llm`).
    pub llm_provider: Option<Arc<dyn LlmProvider>>,
    /// Optional post-retrieval reranker (feature gap G8). When `Some`, the
    /// `knowledge_search` tool overfetches candidates and reorders the top-K with
    /// this reranker before they reach the model. `None` (the default) keeps the
    /// retrieval order unchanged, so default behavior is byte-for-byte the same.
    /// Selected by [`build_reranker`](crate::reranker::build_reranker).
    pub reranker: Option<Arc<dyn Reranker>>,
}

/// Runs one knowledge-grounded, streaming turn for a session's conversation and
/// emits protocol-shaped events through `sink` as they happen.
///
/// `sink` receives ready-to-send `serde_json::Value` event envelopes (built by
/// [`crate::protocol`]). The caller forwards them over the WebSocket.
///
/// ## Access control (security-critical)
///
/// Both retrieval paths — the engine's auto-injected `[Relevant knowledge]`
/// context and the agent's `knowledge_search` tool — read through
/// [`StorageAdapter::knowledge_for_access`] bound to [`TurnRequest::access`],
/// so a document the requester is not entitled to (e.g. a private-repo doc
/// scoped to a group the requester is not in) is dropped before it can reach the
/// model or a citation. See `docs/ACCESS-CONTROL.md`.
///
/// # Errors
/// Returns an error if message persistence or the agent loop fails fatally. The
/// caller converts this into a protocol `error` event.
pub async fn run_streaming_turn(
    req: TurnRequest<'_>,
    sink: &UnboundedSender<serde_json::Value>,
) -> Result<TurnResult> {
    let TurnRequest {
        storage,
        llm,
        max_iterations,
        conversation_id,
        request_id,
        user_message,
        access,
        llm_provider,
        reranker,
    } = req;

    // The ONE ACL-enforcing knowledge handle both retrieval paths read through.
    // Built once from the requester's `AccessContext` so the auto-injected
    // context query, the agent's `knowledge_search` tool, and the citation
    // mirror all hit the SAME filtered view — a restricted doc can't leak in
    // through one path while being dropped on another.
    let knowledge: Arc<dyn KnowledgeBase> = storage.knowledge_for_access(&access);

    // 0. Mirror the engine's auto-injected `[Relevant knowledge]` query so the
    //    citations include the sources the FIRST LLM call was grounded with.
    //    Same query smooth-operator-core's `Agent` runs (`query(msg, 3)`),
    //    against the same ACL-filtered handle. Best-effort: a KB error yields no
    //    auto-context citations.
    let auto_sources: Vec<KnowledgeResult> = knowledge
        .query(user_message, AUTO_CONTEXT_LIMIT)
        .unwrap_or_default();
    // Sink the knowledge_search tool records its structured results into, for
    // citations built from the sources the agent's searches surfaced.
    let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));

    // 1. Load prior turns for memory BEFORE persisting the new inbound message,
    //    so prior_messages is exactly the history-up-to-now.
    let prior = load_prior_messages(storage.as_ref(), conversation_id).await?;

    // 2. Persist the inbound user message.
    persist_message(
        storage.as_ref(),
        conversation_id,
        Direction::Inbound,
        user_message,
    )
    .await?;

    // 3. Build the agent: ACL-grounded config + knowledge_search tool (over the
    //    SAME ACL-filtered handle) + replayed prior messages for memory.
    let config = AgentConfig::new("smooth-agent-chat", KNOWLEDGE_CHAT_SYSTEM_PROMPT, llm)
        .with_max_iterations(max_iterations)
        .with_knowledge(Arc::clone(&knowledge))
        .with_prior_messages(prior);

    let mut tools = ToolRegistry::new();
    // Build the knowledge_search tool over the SAME ACL-filtered handle, with the
    // citation sink and — when a reranker was selected (opt-in, G8) — the rerank
    // stage. With `None` (the default) the tool fetches exactly `limit` and
    // returns the retrieval order unchanged.
    let mut knowledge_search = KnowledgeSearchTool::new(Arc::clone(&knowledge))
        .with_result_sink(Arc::clone(&tool_sources));
    if let Some(reranker) = reranker {
        knowledge_search = knowledge_search.with_reranker(reranker);
    }
    tools.register(knowledge_search);

    let agent = {
        let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
        // Inject the mock LLM provider for offline/deterministic tests; in
        // production a live client is built from `llm`.
        match llm_provider {
            Some(provider) => agent.with_llm_provider(provider),
            None => agent,
        }
    };

    // 4. Run with the streaming channel and translate events as they arrive.
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<AgentEvent>();
    let request_id_owned = request_id.to_string();
    let sink_clone = sink.clone();

    // Spawn the event translator so we forward tokens to the client in real
    // time while the agent loop runs concurrently.
    let translator = tokio::spawn(async move {
        let mut invoked_knowledge_search = false;
        while let Some(event) = rx.recv().await {
            match event {
                AgentEvent::TokenDelta { content } => {
                    if !content.is_empty() {
                        let _ = sink_clone
                            .send(crate::protocol::stream_token(&request_id_owned, &content));
                    }
                }
                AgentEvent::ToolCallStart {
                    tool_name,
                    arguments,
                    ..
                } => {
                    if tool_name == "knowledge_search" {
                        invoked_knowledge_search = true;
                    }
                    let _ = sink_clone.send(crate::protocol::stream_chunk(
                        &request_id_owned,
                        &tool_name,
                        json!({
                            "rawResponse": json!({ "toolCall": { "name": tool_name, "arguments": arguments } }),
                        }),
                    ));
                }
                AgentEvent::ToolCallComplete {
                    tool_name,
                    result,
                    is_error,
                    ..
                } => {
                    let _ = sink_clone.send(crate::protocol::stream_chunk(
                        &request_id_owned,
                        &tool_name,
                        json!({
                            "rawResponse": json!({
                                "toolResult": { "name": tool_name, "isError": is_error, "result": result }
                            }),
                        }),
                    ));
                }
                AgentEvent::PhaseStart { phase, .. } => {
                    let _ = sink_clone.send(crate::protocol::stream_chunk(
                        &request_id_owned,
                        &phase,
                        json!({}),
                    ));
                }
                // Started / Completed / token-accounting events are terminal or
                // structural; the protocol carries those via immediate/eventual
                // responses, so they're intentionally not re-emitted here.
                _ => {}
            }
        }
        invoked_knowledge_search
    });

    // Drive the agent loop. `run_with_channel` consumes `tx`; when it returns,
    // the channel closes and the translator task drains and finishes.
    let conversation = agent.run_with_channel(user_message, tx).await?;

    let invoked_knowledge_search = translator.await.unwrap_or(false);

    let reply = conversation
        .last_assistant_content()
        .unwrap_or_default()
        .to_string();

    // 5. Persist the outbound reply and capture its id for eventual_response.
    let message_id = if reply.is_empty() {
        uuid::Uuid::new_v4().to_string()
    } else {
        persist_message(
            storage.as_ref(),
            conversation_id,
            Direction::Outbound,
            &reply,
        )
        .await?
        .id
    };

    // Build citations from the sources that grounded this turn: auto-injected
    // context first (it grounded the first LLM call), then the agent's
    // knowledge_search results. Dedup by document id, cap at MAX_CITATIONS.
    let tool_sources = match Arc::try_unwrap(tool_sources) {
        Ok(mutex) => mutex
            .into_inner()
            .unwrap_or_else(std::sync::PoisonError::into_inner),
        Err(arc) => arc.lock().unwrap_or_else(|p| p.into_inner()).clone(),
    };
    let citations = collect_citations(&auto_sources, &tool_sources);

    Ok(TurnResult {
        reply,
        message_id,
        invoked_knowledge_search,
        citations,
    })
}

/// Build the turn's [`Citation`]s from the knowledge sources that grounded it:
/// the engine's auto-injected `[Relevant knowledge]` context (`auto`, mirrored
/// by the runner) followed by everything the agent's `knowledge_search` calls
/// surfaced (`tool`). Concatenated auto-first, deduplicated by document id
/// (first occurrence wins), mapped to [`Citation`], and capped at
/// [`MAX_CITATIONS`]. Empty when nothing was retrieved.
fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
    let mut seen = std::collections::HashSet::new();
    auto.iter()
        .chain(tool.iter())
        .filter(|r| seen.insert(r.document_id.clone()))
        .take(MAX_CITATIONS)
        .map(Citation::from_knowledge_result)
        .collect()
}

/// Load the conversation's persisted messages (oldest-first, capped) and convert
/// them to engine `Message`s for replay: inbound → User, outbound → Assistant.
async fn load_prior_messages(
    storage: &dyn StorageAdapter,
    conversation_id: &str,
) -> Result<Vec<EngineMessage>> {
    let page = storage
        .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
        .await?;

    let mut out = Vec::with_capacity(page.messages.len());
    for m in page.messages {
        let text = m
            .content
            .text
            .clone()
            .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
            .unwrap_or_default();
        if text.is_empty() {
            continue;
        }
        let role = match m.direction {
            Direction::Inbound => Role::User,
            Direction::Outbound => Role::Assistant,
        };
        out.push(EngineMessage {
            id: m.id,
            role,
            content: text,
            tool_call_id: None,
            tool_name: None,
            tool_calls: vec![],
            reasoning_content: None,
            timestamp: m.created_at,
        });
    }
    Ok(out)
}

/// Append a single message to the conversation's log via the adapter.
async fn persist_message(
    storage: &dyn StorageAdapter,
    conversation_id: &str,
    direction: Direction,
    text: &str,
) -> Result<DomainMessage> {
    let now = chrono::Utc::now();
    let message = DomainMessage {
        id: uuid::Uuid::new_v4().to_string(),
        external_id: None,
        organization_id: None,
        conversation_id: Some(conversation_id.to_string()),
        direction,
        content: MessageContent::from_text(text),
        from: None,
        to: None,
        metadata_json: None,
        analytics_json: None,
        created_at: now,
        updated_at: None,
    };
    storage.append_message(message).await
}

/// Build the structured `GeneralAgentResponse`-shaped payload the protocol's
/// `eventual_response` carries. The reference runtime doesn't produce the full
/// structured analytics, so we surface the reply text in `responseParts` and
/// supply neutral defaults for the analytic fields (clients render
/// `responseParts`).
#[must_use]
pub fn general_agent_response(reply: &str) -> serde_json::Value {
    json!({
        "responseParts": [reply],
        "customerHappinessScore": 0.5,
        "needsSatisfactionScore": 0.5,
        "requestSummary": "",
        "resolutionStatus": "in_progress",
        "suggestedNextActions": [],
    })
}