Skip to main content

smooth_operator/
runtime.rs

1//! Minimal `AgentRuntime` that proves smooth-operator consumes smooth-operator.
2//!
3//! This is the seam where the smooai monorepo's LangGraph pipeline gets
4//! re-expressed as a smooth-operator [`Workflow`] / [`Agent`] (see
5//! `docs/ARCHITECTURE.md` §2). It does not perform real inference — it
6//! constructs the engine's primitives so the wiring is compile-checked and
7//! exercised by tests. Real inference arrives in roadmap Phase 3.
8
9use std::sync::{Arc, Mutex};
10
11use anyhow::Result;
12use smooth_operator_core::llm_provider::LlmProvider;
13use smooth_operator_core::{
14    Agent, AgentConfig, AgentEvent, FnNode, LlmConfig, Message as EngineMessage, Role,
15    ToolRegistry, Workflow, WorkflowBuilder,
16};
17
18use smooth_operator_core::KnowledgeResult;
19
20use crate::access_control::{AccessContext, AclKnowledgeStore};
21use crate::adapter::{MessageQuery, StorageAdapter};
22use crate::curation::{CuratedKnowledgeStore, RetrievalFilter};
23use crate::domain::{Citation, Direction, Message as DomainMessage, MessageContent};
24use crate::telemetry::{
25    GEN_AI_CONVERSATION_ID, GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM, GEN_AI_TOOL_NAME,
26    GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, SPAN_CHAT, SPAN_TOOL, SYSTEM_NAME,
27};
28use crate::tools::{KnowledgeResultSink, KnowledgeSearchTool};
29use tracing::Instrument;
30
31/// State threaded through the reference workflow: the user's message in, the
32/// agent's reply out. Mirrors (in miniature) the LangGraph `StateGraph` state.
33#[derive(Debug, Clone, Default)]
34pub struct TurnState {
35    pub user_message: String,
36    pub reply: Option<String>,
37}
38
39/// A minimal runtime that owns a constructed smooth-operator [`Agent`] and a
40/// trivial single-node [`Workflow`]. Both are real engine objects.
41pub struct AgentRuntime {
42    agent: Agent,
43    workflow: Workflow<TurnState>,
44}
45
46impl AgentRuntime {
47    /// Construct the runtime from an [`LlmConfig`] and a [`ToolRegistry`].
48    ///
49    /// This is the load-bearing proof of consumption: it builds an
50    /// `AgentConfig` + `Agent` from the engine, and compiles a one-`FnNode`
51    /// `Workflow` whose node echoes the user message back as the reply.
52    ///
53    /// # Errors
54    /// Returns an error if the workflow fails to build (misconfigured graph).
55    pub fn new(name: impl Into<String>, llm: LlmConfig, tools: ToolRegistry) -> Result<Self> {
56        let name = name.into();
57
58        // --- construct a real smooth-operator Agent ---
59        let config = AgentConfig::new(&name, "You are a smooth-agent reference runtime.", llm)
60            .with_max_iterations(8);
61        let agent = Agent::new(config, tools);
62
63        // --- construct a real smooth-operator Workflow with one FnNode ---
64        let respond = FnNode::new("respond", |mut state: TurnState| {
65            Box::pin(async move {
66                state.reply = Some(format!("ack: {}", state.user_message));
67                Ok(state)
68            })
69        });
70        let workflow = WorkflowBuilder::new()
71            .add_node(respond)
72            .set_entry("respond")
73            .set_end("respond")
74            .build()?;
75
76        Ok(Self { agent, workflow })
77    }
78
79    /// Construct a runtime and wire the storage adapter's checkpoint store +
80    /// knowledge base into the engine, demonstrating the `StorageAdapter`
81    /// accessors plug straight into smooth-operator.
82    ///
83    /// # Errors
84    /// Returns an error if the workflow fails to build.
85    pub fn with_storage(
86        name: impl Into<String>,
87        llm: LlmConfig,
88        tools: ToolRegistry,
89        storage: &dyn StorageAdapter,
90    ) -> Result<Self> {
91        let name = name.into();
92
93        let config = AgentConfig::new(&name, "You are a smooth-agent reference runtime.", llm)
94            .with_max_iterations(8)
95            // KnowledgeBase from the adapter plugs straight into AgentConfig.
96            .with_knowledge(storage.knowledge());
97
98        // CheckpointStore from the adapter plugs straight into the Agent.
99        let agent = Agent::new(config, tools).with_checkpoint_store(storage.checkpoints());
100
101        let respond = FnNode::new("respond", |mut state: TurnState| {
102            Box::pin(async move {
103                state.reply = Some(format!("ack: {}", state.user_message));
104                Ok(state)
105            })
106        });
107        let workflow = WorkflowBuilder::new()
108            .add_node(respond)
109            .set_entry("respond")
110            .set_end("respond")
111            .build()?;
112
113        Ok(Self { agent, workflow })
114    }
115
116    /// The engine-generated agent id (proves the `Agent` was constructed).
117    pub fn agent_id(&self) -> &str {
118        &self.agent.id
119    }
120
121    /// Run one turn through the smooth-operator workflow. Returns the reply
122    /// produced by the node. (No LLM call — the node is deterministic.)
123    ///
124    /// # Errors
125    /// Returns an error if the workflow run fails.
126    pub async fn run(&self, message: impl Into<String>) -> Result<String> {
127        let state = TurnState {
128            user_message: message.into(),
129            reply: None,
130        };
131        let out = self.workflow.run(state).await?;
132        Ok(out.reply.unwrap_or_default())
133    }
134
135    /// Borrow the underlying engine agent (e.g. to attach an event handler).
136    pub fn agent(&self) -> &Agent {
137        &self.agent
138    }
139}
140
141/// Convenience: an `Arc`-wrapped runtime.
142pub type SharedRuntime = Arc<AgentRuntime>;
143
144/// The system prompt the knowledge-chat agent runs with. Keeps the agent
145/// grounded: answer from the knowledge base, and search it before answering
146/// anything organization-specific.
147const KNOWLEDGE_CHAT_SYSTEM_PROMPT: &str =
148    "You are a helpful customer-support agent for the organization. \
149    Answer the user's question accurately and concisely. When a question depends on \
150    organization-specific facts (policies, products, documentation), call the \
151    `knowledge_search` tool to retrieve them before answering, and ground your answer \
152    in what you retrieve. If the knowledge base has no relevant information, say so. \
153    Remember facts the user tells you within the conversation and use them when asked.";
154
155/// Max prior turns to replay into the conversation for cross-turn memory.
156/// Bounds context growth on long sessions; the in-memory log is small, but a
157/// real backend (Postgres/DynamoDB) could be large.
158const MAX_PRIOR_MESSAGES: usize = 50;
159
160/// Max citations attached to a turn's [`TurnOutcome`]. Bounds the size of the
161/// `eventual_response` payload; the grounding sources past this cap are dropped
162/// (most-relevant kept first).
163pub const MAX_CITATIONS: usize = 8;
164
165/// How many auto-injected knowledge results the engine prepends as
166/// `[Relevant knowledge]` context. The runtime mirrors this exact query
167/// (`knowledge.query(user_message, AUTO_CONTEXT_LIMIT)`) so the citations it
168/// collects match the sources the engine actually grounded the first LLM call
169/// with. Kept in lockstep with smooth-operator-core's `Agent` auto-injection
170/// (currently a top-3 query).
171const AUTO_CONTEXT_LIMIT: usize = 3;
172
173/// The outcome of running one knowledge-grounded turn through the agent.
174///
175/// Carries the final assistant text plus every [`AgentEvent`] the engine
176/// emitted during the run — so callers (and tests) can inspect exactly what
177/// happened: which tools ran, what they returned, how many iterations.
178#[derive(Debug, Clone)]
179pub struct TurnOutcome {
180    /// The agent's final natural-language reply (the last assistant turn with
181    /// no pending tool calls). Empty string if the agent produced no text.
182    pub reply: String,
183    /// Every event the engine emitted, in order. Inspect for
184    /// [`AgentEvent::ToolCallStart`] / [`AgentEvent::ToolCallComplete`] to
185    /// prove a knowledge search happened.
186    pub events: Vec<AgentEvent>,
187    /// The sources that grounded this turn, deduplicated by id and capped at
188    /// [`MAX_CITATIONS`]. Collected from the documents the turn actually
189    /// retrieved — the engine's auto-injected `[Relevant knowledge]` context
190    /// (mirrored by the runtime) plus every `knowledge_search` tool result.
191    /// Empty when nothing was retrieved.
192    pub citations: Vec<Citation>,
193}
194
195/// Extract `(input_tokens, output_tokens)` from the engine's terminal
196/// [`AgentEvent::Completed`] event, if one is present and carries usage. The
197/// engine reports `prompt_tokens` / `completion_tokens` on `Completed`; those
198/// map directly onto the GenAI `gen_ai.usage.input_tokens` /
199/// `gen_ai.usage.output_tokens` attributes. Returns `None` when there is no
200/// `Completed` event (e.g. a mock turn that didn't surface usage), so the
201/// caller omits the attributes rather than recording zeros.
202fn usage_from_events(events: &[AgentEvent]) -> Option<(u64, u64)> {
203    events.iter().find_map(|e| match e {
204        AgentEvent::Completed {
205            prompt_tokens,
206            completion_tokens,
207            ..
208        } if *prompt_tokens > 0 || *completion_tokens > 0 => {
209            Some((*prompt_tokens, *completion_tokens))
210        }
211        _ => None,
212    })
213}
214
215/// Build the turn's [`Citation`]s from the knowledge sources that grounded it.
216///
217/// `auto` is the engine's auto-injected `[Relevant knowledge]` context (mirrored
218/// by the runtime), `tool` is everything the agent's `knowledge_search` calls
219/// surfaced. They're concatenated auto-first, deduplicated by document id
220/// (first occurrence wins — auto-context keeps its score when the same doc is
221/// also tool-searched), each mapped to a [`Citation`]
222/// ([`Citation::from_knowledge_result`]), and capped at [`MAX_CITATIONS`].
223///
224/// Returns an empty vec when nothing was retrieved.
225fn collect_citations(auto: &[KnowledgeResult], tool: &[KnowledgeResult]) -> Vec<Citation> {
226    let mut seen = std::collections::HashSet::new();
227    auto.iter()
228        .chain(tool.iter())
229        .filter(|r| seen.insert(r.document_id.clone()))
230        .take(MAX_CITATIONS)
231        .map(Citation::from_knowledge_result)
232        .collect()
233}
234
235impl TurnOutcome {
236    /// `true` if the agent invoked a tool named `tool_name` during the turn.
237    #[must_use]
238    pub fn invoked_tool(&self, tool_name: &str) -> bool {
239        self.events.iter().any(|e| {
240            matches!(
241                e,
242                AgentEvent::ToolCallStart { tool_name: name, .. } if name == tool_name
243            )
244        })
245    }
246
247    /// The completed result text of the first call to `tool_name`, if any.
248    /// Sourced from [`AgentEvent::ToolCallComplete`] (truncated to ~500 chars
249    /// by the engine), so a test can assert the tool returned the seeded doc.
250    #[must_use]
251    pub fn tool_result(&self, tool_name: &str) -> Option<&str> {
252        self.events.iter().find_map(|e| match e {
253            AgentEvent::ToolCallComplete {
254                tool_name: name,
255                result,
256                ..
257            } if name == tool_name => Some(result.as_str()),
258            _ => None,
259        })
260    }
261}
262
263/// A real, knowledge-grounded chat runtime over smooth-operator.
264///
265/// This is the first end-to-end "knowledge-chat turn" for smooth-operator: it
266/// wires a [`StorageAdapter`]'s [`KnowledgeBase`](smooth_operator_core::KnowledgeBase)
267/// into a smooth-operator [`Agent`] two ways —
268///
269/// 1. **Auto-injected context** via
270///    [`AgentConfig::with_knowledge`](smooth_operator_core::AgentConfig::with_knowledge):
271///    the engine queries the KB with the user's message and prepends the top
272///    matches as a `[Relevant knowledge]` system message before the first LLM
273///    call.
274/// 2. **Agent-driven search** via the [`KnowledgeSearchTool`]: the model can
275///    issue its own `knowledge_search` query mid-turn with its own phrasing.
276///
277/// Construct with [`KnowledgeChatRuntime::new`] for production (a real
278/// [`LlmClient`](smooth_operator_core::llm::LlmClient) is built from the
279/// [`LlmConfig`]), or inject a mock via
280/// [`KnowledgeChatRuntime::with_llm_provider`] for deterministic, key-free
281/// tests.
282pub struct KnowledgeChatRuntime {
283    storage: Arc<dyn StorageAdapter>,
284    llm: LlmConfig,
285    /// Optional test-injected LLM surface. When set, every `run_turn` builds
286    /// its `Agent` with this provider instead of a live client.
287    llm_provider: Option<Arc<dyn LlmProvider>>,
288    max_iterations: u32,
289    /// Document-level access control (feature gap G3). When set, the runtime wraps
290    /// the storage adapter's [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) in
291    /// the given [`AclKnowledgeStore`] and reads through a per-turn
292    /// [`AccessContext`]-bound reader, so retrieval (both the auto-injected
293    /// context and the `knowledge_search` tool) only surfaces documents the
294    /// requester is entitled to. `None` ⇒ no document-level filtering (org
295    /// isolation upstream is unaffected); the raw `storage.knowledge()` is used.
296    access: Option<RuntimeAccessControl>,
297    /// Query-time curation: document-set + metadata scoping with boost re-ranking
298    /// (Phase 11). When set, the runtime reads knowledge through a
299    /// [`CuratedKnowledgeStore`] reader bound to the given [`RetrievalFilter`]
300    /// (and the requester's [`AccessContext`], so ACL ∧ curation both apply).
301    /// `None` ⇒ no curation filter (current behavior unchanged). Takes precedence
302    /// over [`access`](Self::access) when both are set, because the curated store
303    /// enforces ACL itself.
304    curation: Option<RuntimeCuration>,
305}
306
307/// The runtime's bound access-control state: the ACL-aware knowledge store
308/// (shared, owns the ACL side table populated at ingest) plus the requester
309/// identity to filter reads by.
310#[derive(Clone)]
311struct RuntimeAccessControl {
312    store: AclKnowledgeStore,
313    context: AccessContext,
314}
315
316/// The runtime's bound curation state: the curation-aware knowledge store
317/// (shared, owns the curation + ACL side tables populated at ingest), the
318/// requester identity (so ACL ∧ curation both apply), and the query-time filter
319/// to scope reads by.
320#[derive(Clone)]
321struct RuntimeCuration {
322    store: CuratedKnowledgeStore,
323    context: AccessContext,
324    filter: RetrievalFilter,
325}
326
327impl KnowledgeChatRuntime {
328    /// Build a production runtime over a storage adapter and LLM config.
329    #[must_use]
330    pub fn new(storage: Arc<dyn StorageAdapter>, llm: LlmConfig) -> Self {
331        Self {
332            storage,
333            llm,
334            llm_provider: None,
335            max_iterations: 8,
336            access: None,
337            curation: None,
338        }
339    }
340
341    /// Enable query-time curation for this runtime (Phase 11): scope retrieval to
342    /// named document sets / metadata equalities and re-rank by per-document
343    /// boost.
344    ///
345    /// `store` is a [`CuratedKnowledgeStore`] wrapping the same inner
346    /// [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) the documents were
347    /// ingested through (so its curation + ACL side tables are populated),
348    /// `context` is the requester's identity (ACL ∧ curation both apply — the
349    /// curated store enforces document-level access control itself), and `filter`
350    /// scopes the reads. Both the auto-injected `[Relevant knowledge]` context and
351    /// the `knowledge_search` tool read through this filtered, boosted reader.
352    ///
353    /// Pass [`RetrievalFilter::none`] to apply boost re-ranking with no
354    /// set/metadata scoping. Without calling this, retrieval is unchanged.
355    #[must_use]
356    pub fn with_curation(
357        mut self,
358        store: CuratedKnowledgeStore,
359        context: AccessContext,
360        filter: RetrievalFilter,
361    ) -> Self {
362        self.curation = Some(RuntimeCuration {
363            store,
364            context,
365            filter,
366        });
367        self
368    }
369
370    /// Set (or replace) just the [`RetrievalFilter`] on an already-configured
371    /// curation store, so a per-turn scope can be applied without rebuilding the
372    /// store. No-op (logs nothing) when curation is not configured.
373    #[must_use]
374    pub fn with_retrieval_filter(mut self, filter: RetrievalFilter) -> Self {
375        if let Some(curation) = &mut self.curation {
376            curation.filter = filter;
377        }
378        self
379    }
380
381    /// Enable document-level access control for this runtime (feature gap G3).
382    ///
383    /// `store` is an [`AclKnowledgeStore`] that wraps the same inner
384    /// [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) the documents were
385    /// ingested through (so its ACL side table is populated), and `context` is
386    /// the requester's identity. With this set, every turn reads knowledge
387    /// through an [`AccessContext`]-bound reader — both the auto-injected
388    /// `[Relevant knowledge]` context and the `knowledge_search` tool drop
389    /// documents the requester is not entitled to.
390    ///
391    /// Without it, the runtime reads the raw `storage.knowledge()` exactly as
392    /// before (backward-compatible — existing no-ACL knowledge stays
393    /// retrievable).
394    #[must_use]
395    pub fn with_access_control(mut self, store: AclKnowledgeStore, context: AccessContext) -> Self {
396        self.access = Some(RuntimeAccessControl { store, context });
397        self
398    }
399
400    /// The knowledge handle a turn reads through: an ACL-filtering reader bound
401    /// to the requester when access control is enabled, otherwise the raw
402    /// storage-adapter knowledge base (unfiltered, org-scoping only).
403    fn read_knowledge(&self) -> Arc<dyn smooth_operator_core::KnowledgeBase> {
404        // Curation (when set) takes precedence: its reader enforces ACL ∧
405        // set/metadata filter and applies boost re-ranking in one pass.
406        if let Some(cur) = &self.curation {
407            return cur.store.reader(cur.filter.clone(), cur.context.clone());
408        }
409        match &self.access {
410            Some(ac) => ac.store.reader(ac.context.clone()),
411            None => self.storage.knowledge(),
412        }
413    }
414
415    /// Inject a custom [`LlmProvider`] (e.g. a
416    /// [`MockLlmClient`](smooth_operator_core::llm_provider::MockLlmClient)) so the
417    /// agent loop runs deterministically with no network / API key. This is
418    /// the test seam.
419    #[must_use]
420    pub fn with_llm_provider(mut self, provider: Arc<dyn LlmProvider>) -> Self {
421        self.llm_provider = Some(provider);
422        self
423    }
424
425    /// Cap on agent loop iterations (LLM call → tool calls → LLM call → …).
426    /// Defaults to 8.
427    #[must_use]
428    pub fn with_max_iterations(mut self, max: u32) -> Self {
429        self.max_iterations = max;
430        self
431    }
432
433    /// Build the `Agent` for a turn: knowledge-grounded config + the
434    /// `knowledge_search` tool + the conversation's prior turns replayed for
435    /// cross-turn memory, with the mock LLM provider attached when one was
436    /// injected for tests.
437    ///
438    /// `prior` is the conversation's persisted message log (oldest-first),
439    /// already converted to engine messages. Replaying it via
440    /// [`AgentConfig::with_prior_messages`] is what gives turn 2 memory of
441    /// turn 1: `Agent::new` randomizes the agent id every turn, so the
442    /// checkpoint-resume path can't be keyed stably — replaying the persisted
443    /// log is the robust, backend-agnostic way to carry memory.
444    fn build_agent(
445        &self,
446        events: Arc<Mutex<Vec<AgentEvent>>>,
447        prior: Vec<EngineMessage>,
448        citation_sink: KnowledgeResultSink,
449    ) -> Agent {
450        // The knowledge handle both retrieval paths read through. When access
451        // control is enabled this is an ACL-filtering reader bound to the
452        // requester's `AccessContext` (feature gap G3); otherwise it's the raw
453        // org-scoped knowledge base. Built once so both paths hit the SAME store
454        // and the SAME ACL filter.
455        let knowledge = self.read_knowledge();
456
457        // (1) Auto-injected knowledge context: the engine queries the KB with
458        //     the user's message and prepends matches before the first call.
459        let config = AgentConfig::new(
460            "smooth-agent-chat",
461            KNOWLEDGE_CHAT_SYSTEM_PROMPT,
462            self.llm.clone(),
463        )
464        .with_max_iterations(self.max_iterations)
465        .with_knowledge(Arc::clone(&knowledge))
466        // (1b) Cross-turn memory: replay the conversation's prior turns so the
467        //      model sees turn 1 when answering turn 2.
468        .with_prior_messages(prior);
469
470        // (2) Agent-driven search: register the knowledge_search tool over the
471        //     SAME knowledge handle, so a tool call hits the same store and the
472        //     same ACL filter. The result sink lets the runtime collect the
473        //     sources the agent's searches surfaced, for citations.
474        let mut tools = ToolRegistry::new();
475        tools.register(KnowledgeSearchTool::new(knowledge).with_result_sink(citation_sink));
476
477        let agent = Agent::new(config, tools)
478            .with_checkpoint_store(self.storage.checkpoints())
479            .with_event_handler(move |event| {
480                events.lock().expect("event sink poisoned").push(event);
481            });
482
483        match &self.llm_provider {
484            Some(provider) => agent.with_llm_provider(Arc::clone(provider)),
485            None => agent,
486        }
487    }
488
489    /// Run one knowledge-grounded turn.
490    ///
491    /// Drives the smooth-operator agent loop to completion, then returns the
492    /// final assistant text plus every [`AgentEvent`] emitted. The inbound
493    /// user message and the outbound reply are also persisted to the storage
494    /// adapter's message log under `conversation_id` (best-effort: a persist
495    /// failure surfaces as an error so callers don't silently lose history).
496    ///
497    /// # Errors
498    /// Returns an error if the agent loop fails fatally or message persistence
499    /// fails.
500    pub async fn run_turn(&self, conversation_id: &str, user_message: &str) -> Result<TurnOutcome> {
501        // --- OpenTelemetry GenAI span for the whole turn ---
502        //
503        // A `gen_ai.chat` span (GenAI semantic conventions) carries the system,
504        // model, and conversation id up front, plus token usage recorded on
505        // completion. `tracing-opentelemetry` maps these fields onto an OTLP
506        // span when an exporter is installed (see `telemetry::init_telemetry`);
507        // with no collector configured they're simply captured locally.
508        //
509        // `input_tokens` / `output_tokens` are declared as empty fields here so
510        // they can be `record()`ed after the run if the engine reported usage.
511        let turn_span = tracing::info_span!(
512            SPAN_CHAT,
513            { GEN_AI_SYSTEM } = SYSTEM_NAME,
514            { GEN_AI_REQUEST_MODEL } = %self.llm.model,
515            { GEN_AI_CONVERSATION_ID } = %conversation_id,
516            { GEN_AI_USAGE_INPUT_TOKENS } = tracing::field::Empty,
517            { GEN_AI_USAGE_OUTPUT_TOKENS } = tracing::field::Empty,
518        );
519
520        // Run the turn body inside the span so any engine-internal spans nest
521        // under it. `Instrument` keeps the span entered across awaits.
522        let outcome = self
523            .run_turn_inner(conversation_id, user_message)
524            .instrument(turn_span.clone())
525            .await?;
526
527        // Record token usage on the turn span if the engine reported it via the
528        // terminal `Completed` event (omitted otherwise, per the GenAI convs).
529        if let Some((input, output)) = usage_from_events(&outcome.events) {
530            turn_span.record(GEN_AI_USAGE_INPUT_TOKENS, input);
531            turn_span.record(GEN_AI_USAGE_OUTPUT_TOKENS, output);
532        }
533
534        // Emit a child `gen_ai.tool` span per tool call so each invocation is an
535        // independent, named, timed span in the trace. We materialize these from
536        // the collected events (rather than inside the event handler) so the
537        // spans hang off the turn span without restructuring the runtime.
538        for event in &outcome.events {
539            if let AgentEvent::ToolCallComplete {
540                tool_name,
541                duration_ms,
542                is_error,
543                ..
544            } = event
545            {
546                let _tool_span = tracing::info_span!(
547                    parent: &turn_span,
548                    SPAN_TOOL,
549                    { GEN_AI_TOOL_NAME } = %tool_name,
550                    duration_ms = *duration_ms,
551                    is_error = *is_error,
552                )
553                .entered();
554            }
555        }
556
557        Ok(outcome)
558    }
559
560    /// The un-instrumented turn body. Split out from [`run_turn`] so the OTel
561    /// `gen_ai.chat` span wraps exactly the engine run + persistence without
562    /// cluttering the instrumentation logic.
563    async fn run_turn_inner(
564        &self,
565        conversation_id: &str,
566        user_message: &str,
567    ) -> Result<TurnOutcome> {
568        let events = Arc::new(Mutex::new(Vec::<AgentEvent>::new()));
569        // Sink the knowledge_search tool records its structured results into, so
570        // we can build citations from the sources the agent's searches surfaced.
571        let tool_sources: KnowledgeResultSink = Arc::new(Mutex::new(Vec::new()));
572
573        // Mirror the engine's auto-injected `[Relevant knowledge]` query so the
574        // citations include the sources the FIRST LLM call was grounded with.
575        // `smooth-operator-core`'s `Agent` queries `knowledge.query(msg, 3)` and
576        // prepends the matches as context (see `agent.rs`); we run the same
577        // query against the same knowledge handle here. Best-effort: a KB error
578        // just yields no auto-context citations (the turn still proceeds).
579        let auto_sources: Vec<KnowledgeResult> = self
580            .read_knowledge()
581            .query(user_message, AUTO_CONTEXT_LIMIT)
582            .unwrap_or_default();
583
584        // Load the conversation's prior turns for cross-turn memory BEFORE
585        // persisting the new inbound message, so `prior` is exactly the
586        // history-up-to-now (the new message is replayed by `Agent::run` as the
587        // current user turn, not as a duplicated prior message).
588        let prior = self.load_prior_messages(conversation_id).await?;
589        let agent = self.build_agent(Arc::clone(&events), prior, Arc::clone(&tool_sources));
590
591        // Persist the inbound user message.
592        self.persist_message(conversation_id, Direction::Inbound, user_message)
593            .await?;
594
595        // Run the engine loop — this is where retrieval + tool calls happen.
596        let conversation = agent.run(user_message).await?;
597
598        let reply = conversation
599            .last_assistant_content()
600            .unwrap_or_default()
601            .to_string();
602
603        // Persist the outbound reply.
604        if !reply.is_empty() {
605            self.persist_message(conversation_id, Direction::Outbound, &reply)
606                .await?;
607        }
608
609        // Drop the agent so its event-handler closure releases the `events`
610        // Arc clone — then we hold the sole reference and can move the vec out.
611        drop(agent);
612
613        // The agent dropped its handler clone when `agent` went out of scope,
614        // so we hold the only reference — but fall back to a clone if not.
615        let events = match Arc::try_unwrap(events) {
616            Ok(mutex) => mutex
617                .into_inner()
618                .unwrap_or_else(std::sync::PoisonError::into_inner),
619            Err(arc) => arc.lock().expect("event sink poisoned").clone(),
620        };
621
622        // Build citations from the sources that grounded this turn: the
623        // auto-injected `[Relevant knowledge]` context first (it grounded the
624        // first LLM call), then whatever the agent's `knowledge_search` calls
625        // surfaced. Dedup by document id (auto-context wins ties, so its score
626        // is kept) and cap.
627        let tool_sources = match Arc::try_unwrap(tool_sources) {
628            Ok(mutex) => mutex
629                .into_inner()
630                .unwrap_or_else(std::sync::PoisonError::into_inner),
631            Err(arc) => arc.lock().expect("citation sink poisoned").clone(),
632        };
633        let citations = collect_citations(&auto_sources, &tool_sources);
634
635        Ok(TurnOutcome {
636            reply,
637            events,
638            citations,
639        })
640    }
641
642    /// Append a single message to the conversation's log via the adapter.
643    async fn persist_message(
644        &self,
645        conversation_id: &str,
646        direction: Direction,
647        text: &str,
648    ) -> Result<()> {
649        let now = chrono::Utc::now();
650        let message = DomainMessage {
651            id: uuid::Uuid::new_v4().to_string(),
652            external_id: None,
653            organization_id: None,
654            conversation_id: Some(conversation_id.to_string()),
655            direction,
656            content: MessageContent::from_text(text),
657            from: None,
658            to: None,
659            metadata_json: None,
660            analytics_json: None,
661            created_at: now,
662            updated_at: None,
663        };
664        self.storage.append_message(message).await?;
665        Ok(())
666    }
667
668    /// Load the conversation's persisted messages (oldest-first, capped at
669    /// [`MAX_PRIOR_MESSAGES`]) and convert them to engine [`EngineMessage`]s for
670    /// replay: inbound → [`Role::User`], outbound → [`Role::Assistant`]. Empty
671    /// messages are skipped. This is the same approach the WS service runner
672    /// uses (`smooth-operator-server/src/runner.rs`).
673    async fn load_prior_messages(&self, conversation_id: &str) -> Result<Vec<EngineMessage>> {
674        let page = self
675            .storage
676            .list_messages_by_conversation(MessageQuery::new(conversation_id, MAX_PRIOR_MESSAGES))
677            .await?;
678
679        let mut out = Vec::with_capacity(page.messages.len());
680        for m in page.messages {
681            let text = m
682                .content
683                .text
684                .clone()
685                .or_else(|| m.content.items.iter().find_map(|it| it.text.clone()))
686                .unwrap_or_default();
687            if text.is_empty() {
688                continue;
689            }
690            let role = match m.direction {
691                Direction::Inbound => Role::User,
692                Direction::Outbound => Role::Assistant,
693            };
694            out.push(EngineMessage {
695                id: m.id,
696                role,
697                content: text,
698                tool_call_id: None,
699                tool_name: None,
700                tool_calls: vec![],
701                reasoning_content: None,
702                timestamp: m.created_at,
703            });
704        }
705        Ok(out)
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712
713    fn test_llm() -> LlmConfig {
714        LlmConfig::openrouter("test-key").with_model("openai/gpt-4o")
715    }
716
717    #[tokio::test]
718    async fn runtime_constructs_agent_and_runs_workflow() {
719        let rt =
720            AgentRuntime::new("ref-agent", test_llm(), ToolRegistry::new()).expect("build runtime");
721        // The Agent was really constructed — it has an engine-assigned id.
722        assert!(!rt.agent_id().is_empty());
723        // The Workflow really ran through its FnNode.
724        let reply = rt.run("hello world").await.expect("run");
725        assert_eq!(reply, "ack: hello world");
726    }
727}