smooth_operator/runtime.rs
1//! Minimal `AgentRuntime` that proves smooth-operator consumes smooth-operator.
2//!
3//! This is the seam where the smooai monorepo's LangGraph pipeline gets
4//! re-expressed as a smooth-operator [`Workflow`] / [`Agent`] (see
5//! `docs/ARCHITECTURE.md` §2). It does not perform real inference — it
6//! constructs the engine's primitives so the wiring is compile-checked and
7//! exercised by tests. Real inference arrives in roadmap Phase 3.
8
9use std::sync::{Arc, Mutex};
10
11use anyhow::Result;
12use smooth_operator_core::llm_provider::LlmProvider;
13use smooth_operator_core::{
14 Agent, AgentConfig, AgentEvent, FnNode, LlmConfig, Message as EngineMessage, Role,
15 ToolRegistry, Workflow, WorkflowBuilder,
16};
17
18use smooth_operator_core::KnowledgeResult;
19
20use crate::access_control::{AccessContext, AclKnowledgeStore};
21use crate::adapter::{MessageQuery, StorageAdapter};
22use crate::curation::{CuratedKnowledgeStore, RetrievalFilter};
23use crate::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
24use crate::telemetry::{
25 GEN_AI_CONVERSATION_ID, GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM, GEN_AI_TOOL_NAME,
26 GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, SPAN_CHAT, SPAN_TOOL, SYSTEM_NAME,
27};
28use crate::tools::{KnowledgeResultSink, KnowledgeSearchTool};
29use tracing::Instrument;
30
31/// State threaded through the reference workflow: the user's message in, the
32/// agent's reply out. Mirrors (in miniature) the LangGraph `StateGraph` state.
33#[derive(Debug, Clone, Default)]
34pub struct TurnState {
35 pub user_message: String,
36 pub reply: Option<String>,
37}
38
39/// A minimal runtime that owns a constructed smooth-operator [`Agent`] and a
40/// trivial single-node [`Workflow`]. Both are real engine objects.
41pub struct AgentRuntime {
42 agent: Agent,
43 workflow: Workflow<TurnState>,
44}
45
46impl AgentRuntime {
47 /// Construct the runtime from an [`LlmConfig`] and a [`ToolRegistry`].
48 ///
49 /// This is the load-bearing proof of consumption: it builds an
50 /// `AgentConfig` + `Agent` from the engine, and compiles a one-`FnNode`
51 /// `Workflow` whose node echoes the user message back as the reply.
52 ///
53 /// # Errors
54 /// Returns an error if the workflow fails to build (misconfigured graph).
55 pub fn new(name: impl Into<String>, llm: LlmConfig, tools: ToolRegistry) -> Result<Self> {
56 let name = name.into();
57
58 // --- construct a real smooth-operator Agent ---
59 let config = AgentConfig::new(&name, "You are a smooth-agent reference runtime.", llm)
60 .with_max_iterations(8);
61 let agent = Agent::new(config, tools);
62
63 // --- construct a real smooth-operator Workflow with one FnNode ---
64 let respond = FnNode::new("respond", |mut state: TurnState| {
65 Box::pin(async move {
66 state.reply = Some(format!("ack: {}", state.user_message));
67 Ok(state)
68 })
69 });
70 let workflow = WorkflowBuilder::new()
71 .add_node(respond)
72 .set_entry("respond")
73 .set_end("respond")
74 .build()?;
75
76 Ok(Self { agent, workflow })
77 }
78
79 /// Construct a runtime and wire the storage adapter's checkpoint store +
80 /// knowledge base into the engine, demonstrating the `StorageAdapter`
81 /// accessors plug straight into smooth-operator.
82 ///
83 /// # Errors
84 /// Returns an error if the workflow fails to build.
85 pub fn with_storage(
86 name: impl Into<String>,
87 llm: LlmConfig,
88 tools: ToolRegistry,
89 storage: &dyn StorageAdapter,
90 ) -> Result<Self> {
91 let name = name.into();
92
93 let config = AgentConfig::new(&name, "You are a smooth-agent reference runtime.", llm)
94 .with_max_iterations(8)
95 // KnowledgeBase from the adapter plugs straight into AgentConfig.
96 .with_knowledge(storage.knowledge());
97
98 // CheckpointStore from the adapter plugs straight into the Agent.
99 let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
100
101 let respond = FnNode::new("respond", |mut state: TurnState| {
102 Box::pin(async move {
103 state.reply = Some(format!("ack: {}", state.user_message));
104 Ok(state)
105 })
106 });
107 let workflow = WorkflowBuilder::new()
108 .add_node(respond)
109 .set_entry("respond")
110 .set_end("respond")
111 .build()?;
112
113 Ok(Self { agent, workflow })
114 }
115
116 /// The engine-generated agent id (proves the `Agent` was constructed).
117 pub fn agent_id(&self) -> &str {
118 &self.agent.id
119 }
120
121 /// Run one turn through the smooth-operator workflow. Returns the reply
122 /// produced by the node. (No LLM call — the node is deterministic.)
123 ///
124 /// # Errors
125 /// Returns an error if the workflow run fails.
126 pub async fn run(&self, message: impl Into<String>) -> Result<String> {
127 let state = TurnState {
128 user_message: message.into(),
129 reply: None,
130 };
131 let out = self.workflow.run(state).await?;
132 Ok(out.reply.unwrap_or_default())
133 }
134
135 /// Borrow the underlying engine agent (e.g. to attach an event handler).
136 pub fn agent(&self) -> &Agent {
137 &self.agent
138 }
139}
140
141/// Convenience: an `Arc`-wrapped runtime.
142pub type SharedRuntime = Arc<AgentRuntime>;
143
144/// The system prompt the knowledge-chat agent runs with. Keeps the agent
145/// grounded: answer from the knowledge base, and search it before answering
146/// anything organization-specific.
147const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
148 "You are a helpful customer-support agent for the organization. \
149 Answer the user's question accurately and concisely. When a question depends on \
150 organization-specific facts (policies, products, documentation), call the \
151 `knowledge_search` tool to retrieve them before answering, and ground your answer \
152 in what you retrieve. If the knowledge base has no relevant information, say so. \
153 Remember facts the user tells you within the conversation and use them when asked.";
154
155/// Max prior turns to replay into the conversation for cross-turn memory.
156/// Bounds context growth on long sessions; the in-memory log is small, but a
157/// real backend (Postgres/DynamoDB) could be large.
158const MAX_PRIOR_MESSAGES: usize = 50;
159
160/// Max citations attached to a turn's [`TurnOutcome`]. Bounds the size of the
161/// `eventual_response` payload; the grounding sources past this cap are dropped
162/// (most-relevant kept first).
163pub const MAX_CITATIONS: usize = 8;
164
165/// How many auto-injected knowledge results the engine prepends as
166/// `[Relevant knowledge]` context. The runtime mirrors this exact query
167/// (`knowledge.query(user_message, AUTO_CONTEXT_LIMIT)`) so the citations it
168/// collects match the sources the engine actually grounded the first LLM call
169/// with. Kept in lockstep with smooth-operator-core's `Agent` auto-injection
170/// (currently a top-3 query).
171const AUTO_CONTEXT_LIMIT: usize = 3;
172
173/// The outcome of running one knowledge-grounded turn through the agent.
174///
175/// Carries the final assistant text plus every [`AgentEvent`] the engine
176/// emitted during the run — so callers (and tests) can inspect exactly what
177/// happened: which tools ran, what they returned, how many iterations.
178#[derive(Debug, Clone)]
179pub struct TurnOutcome {
180 /// The agent's final natural-language reply (the last assistant turn with
181 /// no pending tool calls). Empty string if the agent produced no text.
182 pub reply: String,
183 /// Every event the engine emitted, in order. Inspect for
184 /// [`AgentEvent::ToolCallStart`] / [`AgentEvent::ToolCallComplete`] to
185 /// prove a knowledge search happened.
186 pub events: Vec<AgentEvent>,
187 /// The sources that grounded this turn, deduplicated by id and capped at
188 /// [`MAX_CITATIONS`]. Collected from the documents the turn actually
189 /// retrieved — the engine's auto-injected `[Relevant knowledge]` context
190 /// (mirrored by the runtime) plus every `knowledge_search` tool result.
191 /// Empty when nothing was retrieved.
192 pub citations: Vec<Citation>,
193}
194
195/// Extract `(input_tokens, output_tokens)` from the engine's terminal
196/// [`AgentEvent::Completed`] event, if one is present and carries usage. The
197/// engine reports `prompt_tokens` / `completion_tokens` on `Completed`; those
198/// map directly onto the GenAI `gen_ai.usage.input_tokens` /
199/// `gen_ai.usage.output_tokens` attributes. Returns `None` when there is no
200/// `Completed` event (e.g. a mock turn that didn't surface usage), so the
201/// caller omits the attributes rather than recording zeros.
202fn usage_from_events(events: &[AgentEvent]) -> Option<(u64, u64)> {
203 events.iter().find_map(|e| match e {
204 AgentEvent::Completed {
205 prompt_tokens,
206 completion_tokens,
207 ..
208 } if *prompt_tokens > 0 || *completion_tokens > 0 => {
209 Some((*prompt_tokens, *completion_tokens))
210 }
211 _ => None,
212 })
213}
214
215/// Build the turn's [`Citation`]s from the knowledge sources that grounded it.
216///
217/// `auto` is the engine's auto-injected `[Relevant knowledge]` context (mirrored
218/// by the runtime), `tool` is everything the agent's `knowledge_search` calls
219/// surfaced. They're concatenated auto-first, deduplicated by document id
220/// (first occurrence wins — auto-context keeps its score when the same doc is
221/// also tool-searched), each mapped to a [`Citation`]
222/// ([`Citation::from_knowledge_result`]), and capped at [`MAX_CITATIONS`].
223///
224/// Returns an empty vec when nothing was retrieved.
225fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
226 let mut seen = std::collections::HashSet::new();
227 auto.iter()
228 .chain(tool.iter())
229 .filter(|r| seen.insert(r.document_id.clone()))
230 .take(MAX_CITATIONS)
231 .map(Citation::from_knowledge_result)
232 .collect()
233}
234
235impl TurnOutcome {
236 /// `true` if the agent invoked a tool named `tool_name` during the turn.
237 #[must_use]
238 pub fn invoked_tool(&self, tool_name: &str) -> bool {
239 self.events.iter().any(|e| {
240 matches!(
241 e,
242 AgentEvent::ToolCallStart { tool_name: name, .. } if name == tool_name
243 )
244 })
245 }
246
247 /// The completed result text of the first call to `tool_name`, if any.
248 /// Sourced from [`AgentEvent::ToolCallComplete`] (truncated to ~500 chars
249 /// by the engine), so a test can assert the tool returned the seeded doc.
250 #[must_use]
251 pub fn tool_result(&self, tool_name: &str) -> Option<&str> {
252 self.events.iter().find_map(|e| match e {
253 AgentEvent::ToolCallComplete {
254 tool_name: name,
255 result,
256 ..
257 } if name == tool_name => Some(result.as_str()),
258 _ => None,
259 })
260 }
261}
262
263/// A real, knowledge-grounded chat runtime over smooth-operator.
264///
265/// This is the first end-to-end "knowledge-chat turn" for smooth-operator: it
266/// wires a [`StorageAdapter`]'s [`KnowledgeBase`](smooth_operator_core::KnowledgeBase)
267/// into a smooth-operator [`Agent`] two ways —
268///
269/// 1. **Auto-injected context** via
270/// [`AgentConfig::with_knowledge`](smooth_operator_core::AgentConfig::with_knowledge):
271/// the engine queries the KB with the user's message and prepends the top
272/// matches as a `[Relevant knowledge]` system message before the first LLM
273/// call.
274/// 2. **Agent-driven search** via the [`KnowledgeSearchTool`]: the model can
275/// issue its own `knowledge_search` query mid-turn with its own phrasing.
276///
277/// Construct with [`KnowledgeChatRuntime::new`] for production (a real
278/// [`LlmClient`](smooth_operator_core::llm::LlmClient) is built from the
279/// [`LlmConfig`]), or inject a mock via
280/// [`KnowledgeChatRuntime::with_llm_provider`] for deterministic, key-free
281/// tests.
282pub struct KnowledgeChatRuntime {
283 storage: Arc<dyn StorageAdapter>,
284 llm: LlmConfig,
285 /// Optional test-injected LLM surface. When set, every `run_turn` builds
286 /// its `Agent` with this provider instead of a live client.
287 llm_provider: Option<Arc<dyn LlmProvider>>,
288 max_iterations: u32,
289 /// Document-level access control (feature gap G3). When set, the runtime wraps
290 /// the storage adapter's [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) in
291 /// the given [`AclKnowledgeStore`] and reads through a per-turn
292 /// [`AccessContext`]-bound reader, so retrieval (both the auto-injected
293 /// context and the `knowledge_search` tool) only surfaces documents the
294 /// requester is entitled to. `None` ⇒ no document-level filtering (org
295 /// isolation upstream is unaffected); the raw `storage.knowledge()` is used.
296 access: Option<RuntimeAccessControl>,
297 /// Query-time curation: document-set + metadata scoping with boost re-ranking
298 /// (Phase 11). When set, the runtime reads knowledge through a
299 /// [`CuratedKnowledgeStore`] reader bound to the given [`RetrievalFilter`]
300 /// (and the requester's [`AccessContext`], so ACL ∧ curation both apply).
301 /// `None` ⇒ no curation filter (current behavior unchanged). Takes precedence
302 /// over [`access`](Self::access) when both are set, because the curated store
303 /// enforces ACL itself.
304 curation: Option<RuntimeCuration>,
305}
306
307/// The runtime's bound access-control state: the ACL-aware knowledge store
308/// (shared, owns the ACL side table populated at ingest) plus the requester
309/// identity to filter reads by.
310#[derive(Clone)]
311struct RuntimeAccessControl {
312 store: AclKnowledgeStore,
313 context: AccessContext,
314}
315
316/// The runtime's bound curation state: the curation-aware knowledge store
317/// (shared, owns the curation + ACL side tables populated at ingest), the
318/// requester identity (so ACL ∧ curation both apply), and the query-time filter
319/// to scope reads by.
320#[derive(Clone)]
321struct RuntimeCuration {
322 store: CuratedKnowledgeStore,
323 context: AccessContext,
324 filter: RetrievalFilter,
325}
326
327impl KnowledgeChatRuntime {
328 /// Build a production runtime over a storage adapter and LLM config.
329 #[must_use]
330 pub fn new(storage: Arc<dyn StorageAdapter>, llm: LlmConfig) -> Self {
331 Self {
332 storage,
333 llm,
334 llm_provider: None,
335 max_iterations: 8,
336 access: None,
337 curation: None,
338 }
339 }
340
341 /// Enable query-time curation for this runtime (Phase 11): scope retrieval to
342 /// named document sets / metadata equalities and re-rank by per-document
343 /// boost.
344 ///
345 /// `store` is a [`CuratedKnowledgeStore`] wrapping the same inner
346 /// [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) the documents were
347 /// ingested through (so its curation + ACL side tables are populated),
348 /// `context` is the requester's identity (ACL ∧ curation both apply — the
349 /// curated store enforces document-level access control itself), and `filter`
350 /// scopes the reads. Both the auto-injected `[Relevant knowledge]` context and
351 /// the `knowledge_search` tool read through this filtered, boosted reader.
352 ///
353 /// Pass [`RetrievalFilter::none`] to apply boost re-ranking with no
354 /// set/metadata scoping. Without calling this, retrieval is unchanged.
355 #[must_use]
356 pub fn with_curation(
357 mut self,
358 store: CuratedKnowledgeStore,
359 context: AccessContext,
360 filter: RetrievalFilter,
361 ) -> Self {
362 self.curation = Some(RuntimeCuration {
363 store,
364 context,
365 filter,
366 });
367 self
368 }
369
370 /// Set (or replace) just the [`RetrievalFilter`] on an already-configured
371 /// curation store, so a per-turn scope can be applied without rebuilding the
372 /// store. No-op (logs nothing) when curation is not configured.
373 #[must_use]
374 pub fn with_retrieval_filter(mut self, filter: RetrievalFilter) -> Self {
375 if let Some(curation) = &mut self.curation {
376 curation.filter = filter;
377 }
378 self
379 }
380
381 /// Enable document-level access control for this runtime (feature gap G3).
382 ///
383 /// `store` is an [`AclKnowledgeStore`] that wraps the same inner
384 /// [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) the documents were
385 /// ingested through (so its ACL side table is populated), and `context` is
386 /// the requester's identity. With this set, every turn reads knowledge
387 /// through an [`AccessContext`]-bound reader — both the auto-injected
388 /// `[Relevant knowledge]` context and the `knowledge_search` tool drop
389 /// documents the requester is not entitled to.
390 ///
391 /// Without it, the runtime reads the raw `storage.knowledge()` exactly as
392 /// before (backward-compatible — existing no-ACL knowledge stays
393 /// retrievable).
394 #[must_use]
395 pub fn with_access_control(mut self, store: AclKnowledgeStore, context: AccessContext) -> Self {
396 self.access = Some(RuntimeAccessControl { store, context });
397 self
398 }
399
400 /// The knowledge handle a turn reads through: an ACL-filtering reader bound
401 /// to the requester when access control is enabled, otherwise the raw
402 /// storage-adapter knowledge base (unfiltered, org-scoping only).
403 fn read_knowledge(&self) -> Arc<dyn smooth_operator_core::KnowledgeBase> {
404 // Curation (when set) takes precedence: its reader enforces ACL ∧
405 // set/metadata filter and applies boost re-ranking in one pass.
406 if let Some(cur) = &self.curation {
407 return cur.store.reader(cur.filter.clone(), cur.context.clone());
408 }
409 match &self.access {
410 Some(ac) => ac.store.reader(ac.context.clone()),
411 None => self.storage.knowledge(),
412 }
413 }
414
415 /// Inject a custom [`LlmProvider`] (e.g. a
416 /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)) so the
417 /// agent loop runs deterministically with no network / API key. This is
418 /// the test seam.
419 #[must_use]
420 pub fn with_llm_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
421 self.llm_provider = Some(provider);
422 self
423 }
424
425 /// Cap on agent loop iterations (LLM call → tool calls → LLM call → …).
426 /// Defaults to 8.
427 #[must_use]
428 pub fn with_max_iterations(mut self, max: u32) -> Self {
429 self.max_iterations = max;
430 self
431 }
432
433 /// Build the `Agent` for a turn: knowledge-grounded config + the
434 /// `knowledge_search` tool + the conversation's prior turns replayed for
435 /// cross-turn memory, with the mock LLM provider attached when one was
436 /// injected for tests.
437 ///
438 /// `prior` is the conversation's persisted message log (oldest-first),
439 /// already converted to engine messages. Replaying it via
440 /// [`AgentConfig::with_prior_messages`] is what gives turn 2 memory of
441 /// turn 1: `Agent::new` randomizes the agent id every turn, so the
442 /// checkpoint-resume path can't be keyed stably — replaying the persisted
443 /// log is the robust, backend-agnostic way to carry memory.
444 fn build_agent(
445 &self,
446 events: Arc<Mutex<Vec<AgentEvent>>>,
447 prior: Vec<EngineMessage>,
448 citation_sink: KnowledgeResultSink,
449 ) -> Agent {
450 // The knowledge handle both retrieval paths read through. When access
451 // control is enabled this is an ACL-filtering reader bound to the
452 // requester's `AccessContext` (feature gap G3); otherwise it's the raw
453 // org-scoped knowledge base. Built once so both paths hit the SAME store
454 // and the SAME ACL filter.
455 let knowledge = self.read_knowledge();
456
457 // (1) Auto-injected knowledge context: the engine queries the KB with
458 // the user's message and prepends matches before the first call.
459 let config = AgentConfig::new(
460 "smooth-agent-chat",
461 KNOWLEDGE_CHAT_SYSTEM_PROMPT,
462 self.llm.clone(),
463 )
464 .with_max_iterations(self.max_iterations)
465 .with_knowledge(Arc::clone(&knowledge))
466 // (1b) Cross-turn memory: replay the conversation's prior turns so the
467 // model sees turn 1 when answering turn 2.
468 .with_prior_messages(prior);
469
470 // (2) Agent-driven search: register the knowledge_search tool over the
471 // SAME knowledge handle, so a tool call hits the same store and the
472 // same ACL filter. The result sink lets the runtime collect the
473 // sources the agent's searches surfaced, for citations.
474 let mut tools = ToolRegistry::new();
475 tools.register(KnowledgeSearchTool::new(knowledge).with_result_sink(citation_sink));
476
477 let agent = Agent::new(config, tools)
478 .with_checkpoint_store(self.storage.checkpoints())
479 .with_event_handler(move |event| {
480 events.lock().expect("event sink poisoned").push(event);
481 });
482
483 match &self.llm_provider {
484 Some(provider) => agent.with_llm_provider(Arc::clone(provider)),
485 None => agent,
486 }
487 }
488
489 /// Run one knowledge-grounded turn.
490 ///
491 /// Drives the smooth-operator agent loop to completion, then returns the
492 /// final assistant text plus every [`AgentEvent`] emitted. The inbound
493 /// user message and the outbound reply are also persisted to the storage
494 /// adapter's message log under `conversation_id` (best-effort: a persist
495 /// failure surfaces as an error so callers don't silently lose history).
496 ///
497 /// # Errors
498 /// Returns an error if the agent loop fails fatally or message persistence
499 /// fails.
500 pub async fn run_turn(&self, conversation_id: &str, user_message: &str) -> Result<TurnOutcome> {
501 // --- OpenTelemetry GenAI span for the whole turn ---
502 //
503 // A `gen_ai.chat` span (GenAI semantic conventions) carries the system,
504 // model, and conversation id up front, plus token usage recorded on
505 // completion. `tracing-opentelemetry` maps these fields onto an OTLP
506 // span when an exporter is installed (see `telemetry::init_telemetry`);
507 // with no collector configured they're simply captured locally.
508 //
509 // `input_tokens` / `output_tokens` are declared as empty fields here so
510 // they can be `record()`ed after the run if the engine reported usage.
511 let turn_span = tracing::info_span!(
512 SPAN_CHAT,
513 { GEN_AI_SYSTEM } = SYSTEM_NAME,
514 { GEN_AI_REQUEST_MODEL } = %self.llm.model,
515 { GEN_AI_CONVERSATION_ID } = %conversation_id,
516 { GEN_AI_USAGE_INPUT_TOKENS } = tracing::field::Empty,
517 { GEN_AI_USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
518 );
519
520 // Run the turn body inside the span so any engine-internal spans nest
521 // under it. `Instrument` keeps the span entered across awaits.
522 let outcome = self
523 .run_turn_inner(conversation_id, user_message)
524 .instrument(turn_span.clone())
525 .await?;
526
527 // Record token usage on the turn span if the engine reported it via the
528 // terminal `Completed` event (omitted otherwise, per the GenAI convs).
529 if let Some((input, output)) = usage_from_events(&outcome.events) {
530 turn_span.record(GEN_AI_USAGE_INPUT_TOKENS, input);
531 turn_span.record(GEN_AI_USAGE_OUTPUT_TOKENS, output);
532 }
533
534 // Emit a child `gen_ai.tool` span per tool call so each invocation is an
535 // independent, named, timed span in the trace. We materialize these from
536 // the collected events (rather than inside the event handler) so the
537 // spans hang off the turn span without restructuring the runtime.
538 for event in &outcome.events {
539 if let AgentEvent::ToolCallComplete {
540 tool_name,
541 duration_ms,
542 is_error,
543 ..
544 } = event
545 {
546 let _tool_span = tracing::info_span!(
547 parent: &turn_span,
548 SPAN_TOOL,
549 { GEN_AI_TOOL_NAME } = %tool_name,
550 duration_ms = *duration_ms,
551 is_error = *is_error,
552 )
553 .entered();
554 }
555 }
556
557 Ok(outcome)
558 }
559
560 /// The un-instrumented turn body. Split out from [`run_turn`] so the OTel
561 /// `gen_ai.chat` span wraps exactly the engine run + persistence without
562 /// cluttering the instrumentation logic.
563 async fn run_turn_inner(
564 &self,
565 conversation_id: &str,
566 user_message: &str,
567 ) -> Result<TurnOutcome> {
568 let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
569 // Sink the knowledge_search tool records its structured results into, so
570 // we can build citations from the sources the agent's searches surfaced.
571 let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
572
573 // Mirror the engine's auto-injected `[Relevant knowledge]` query so the
574 // citations include the sources the FIRST LLM call was grounded with.
575 // `smooth-operator-core`'s `Agent` queries `knowledge.query(msg, 3)` and
576 // prepends the matches as context (see `agent.rs`); we run the same
577 // query against the same knowledge handle here. Best-effort: a KB error
578 // just yields no auto-context citations (the turn still proceeds).
579 let auto_sources: Vec<KnowledgeResult> = self
580 .read_knowledge()
581 .query(user_message, AUTO_CONTEXT_LIMIT)
582 .unwrap_or_default();
583
584 // Load the conversation's prior turns for cross-turn memory BEFORE
585 // persisting the new inbound message, so `prior` is exactly the
586 // history-up-to-now (the new message is replayed by `Agent::run` as the
587 // current user turn, not as a duplicated prior message).
588 let prior = self.load_prior_messages(conversation_id).await?;
589 let agent = self.build_agent(Arc::clone(&events), prior, Arc::clone(&tool_sources));
590
591 // Persist the inbound user message.
592 self.persist_message(conversation_id, Direction::Inbound, user_message)
593 .await?;
594
595 // Run the engine loop — this is where retrieval + tool calls happen.
596 let conversation = agent.run(user_message).await?;
597
598 let reply = conversation
599 .last_assistant_content()
600 .unwrap_or_default()
601 .to_string();
602
603 // Persist the outbound reply.
604 if !reply.is_empty() {
605 self.persist_message(conversation_id, Direction::Outbound, &reply)
606 .await?;
607 }
608
609 // Drop the agent so its event-handler closure releases the `events`
610 // Arc clone — then we hold the sole reference and can move the vec out.
611 drop(agent);
612
613 // The agent dropped its handler clone when `agent` went out of scope,
614 // so we hold the only reference — but fall back to a clone if not.
615 let events = match Arc::try_unwrap(events) {
616 Ok(mutex) => mutex
617 .into_inner()
618 .unwrap_or_else(std::sync::PoisonError::into_inner),
619 Err(arc) => arc.lock().expect("event sink poisoned").clone(),
620 };
621
622 // Build citations from the sources that grounded this turn: the
623 // auto-injected `[Relevant knowledge]` context first (it grounded the
624 // first LLM call), then whatever the agent's `knowledge_search` calls
625 // surfaced. Dedup by document id (auto-context wins ties, so its score
626 // is kept) and cap.
627 let tool_sources = match Arc::try_unwrap(tool_sources) {
628 Ok(mutex) => mutex
629 .into_inner()
630 .unwrap_or_else(std::sync::PoisonError::into_inner),
631 Err(arc) => arc.lock().expect("citation sink poisoned").clone(),
632 };
633 let citations = collect_citations(&auto_sources, &tool_sources);
634
635 Ok(TurnOutcome {
636 reply,
637 events,
638 citations,
639 })
640 }
641
642 /// Append a single message to the conversation's log via the adapter.
643 async fn persist_message(
644 &self,
645 conversation_id: &str,
646 direction: Direction,
647 text: &str,
648 ) -> Result<()> {
649 let now = chrono::Utc::now();
650 let message = DomainMessage {
651 id: uuid::Uuid::new_v4().to_string(),
652 external_id: None,
653 organization_id: None,
654 conversation_id: Some(conversation_id.to_string()),
655 direction,
656 content: MessageContent::from_text(text),
657 from: None,
658 to: None,
659 metadata_json: None,
660 analytics_json: None,
661 created_at: now,
662 updated_at: None,
663 };
664 self.storage.append_message(message).await?;
665 Ok(())
666 }
667
668 /// Load the conversation's persisted messages (oldest-first, capped at
669 /// [`MAX_PRIOR_MESSAGES`]) and convert them to engine [`EngineMessage`]s for
670 /// replay: inbound → [`Role::User`], outbound → [`Role::Assistant`]. Empty
671 /// messages are skipped. This is the same approach the WS service runner
672 /// uses (`smooth-operator-server/src/runner.rs`).
673 async fn load_prior_messages(&self, conversation_id: &str) -> Result<Vec<EngineMessage>> {
674 let page = self
675 .storage
676 .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
677 .await?;
678
679 let mut out = Vec::with_capacity(page.messages.len());
680 for m in page.messages {
681 let text = m
682 .content
683 .text
684 .clone()
685 .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
686 .unwrap_or_default();
687 if text.is_empty() {
688 continue;
689 }
690 let role = match m.direction {
691 Direction::Inbound => Role::User,
692 Direction::Outbound => Role::Assistant,
693 };
694 out.push(EngineMessage {
695 id: m.id,
696 role,
697 content: text,
698 tool_call_id: None,
699 tool_name: None,
700 tool_calls: vec![],
701 reasoning_content: None,
702 timestamp: m.created_at,
703 });
704 }
705 Ok(out)
706 }
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712
713 fn test_llm() -> LlmConfig {
714 LlmConfig::openrouter("test-key").with_model("openai/gpt-4o")
715 }
716
717 #[tokio::test]
718 async fn runtime_constructs_agent_and_runs_workflow() {
719 let rt =
720 AgentRuntime::new("ref-agent", test_llm(), ToolRegistry::new()).expect("build runtime");
721 // The Agent was really constructed — it has an engine-assigned id.
722 assert!(!rt.agent_id().is_empty());
723 // The Workflow really ran through its FnNode.
724 let reply = rt.run("hello world").await.expect("run");
725 assert_eq!(reply, "ack: hello world");
726 }
727}