smos_application/use_cases/handle_chat_completion.rs
1//! `HandleChatCompletion` — top-level chat-completion use case (§3 + §4).
2//!
3//! Orchestrates the full request-side pipeline:
4//! 1. Resolve the requested person via [`route_request`] and rewrite
5//! `request.model` to the upstream model id. The person name becomes the
6//! [`MemoryKey`] used by enrichment + extraction.
7//! 2. Inject the persona `.md` (if any) as the system message so the
8//! upstream model reads it first.
9//! 3. Detect the session id from the trailing 20 messages' markers (or mint
10//! a fresh one).
11//! 4. Run [`EnrichRequest`] (memory retrieval + injection). Fail-open for
12//! every port EXCEPT the reranker: an embedder / vector-search / dedup
13//! failure forwards the original messages unchanged, but a reranker
14//! failure (provider error or empty result) propagates as
15//! `Err(UseCaseError::Provider(_))` so the HTTP handler returns 503.
16//! See [`EnrichRequest`] for the rationale.
17//! 5. Forward the (possibly enriched) request to the LLM upstream, naming
18//! the provider that the person routes to.
19//!
20//! Slice-5 extraction is wired in the **adapter** layer (`http/`), not here.
21//! The application layer stays runtime-agnostic: `tokio::spawn` requires a
22//! multi-thread runtime, and the SMOS codebase keeps every runtime operation
23//! (spawn, serve, signal handling) inside `smos`. The adapter wraps
24//! the response stream with a `StreamingBuffer`, and after `[DONE]` spawns the
25//! `ExtractFactsFromResponse` use case. This use case hands the adapter the
26//! `MemoryKey` it needs for that wiring.
27//!
28//! Returns `(ChatResponse, SessionId, MemoryKey)` so the HTTP handler injects
29//! the session marker AND the adapter wires extraction with the right project.
30
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use serde_json::Value;
35use smos_domain::chat::{ToolArguments, ToolCall};
36use smos_domain::config::{HeatConfig, RetrievalConfig};
37use smos_domain::{MemoryKey, SessionId};
38
39use crate::errors::UseCaseError;
40use crate::helpers::person_router::{
41 PersonEntry, ProviderEntry, inject_persona_into_messages, load_persona_at, route_request,
42};
43use crate::helpers::session_marker;
44use crate::ports::{
45 Clock, EmbeddingProvider, FactRepository, IdGenerator, LlmUpstream, RerankProvider,
46 SessionRepository,
47};
48use crate::types::{ChatRequest, ChatResponse, enrichment_messages_from_json};
49use crate::use_cases::enrich_request::EnrichRequest;
50
51/// Top-level chat-completion orchestration.
52///
53/// Owns the ports the REQUEST-side pipeline needs (enrichment + upstream
54/// forwarding) plus the routing maps (`persons`, `providers`) needed to
55/// resolve a requested person into a concrete upstream route. Extraction
56/// ports live in `AppState` and are wired by the adapter — see the module
57/// docs for the layering rationale.
58pub struct HandleChatCompletion<FR, SR, EP, RP, LU, C, IG> {
59 pub facts: FR,
60 pub sessions: SR,
61 pub embedder: EP,
62 pub reranker: RP,
63 pub upstream: LU,
64 pub clock: C,
65 pub id_generator: IG,
66 pub retrieval_cfg: Arc<RetrievalConfig>,
67 pub heat_cfg: Arc<HeatConfig>,
68 pub persons: Arc<HashMap<String, PersonEntry>>,
69 pub providers: Arc<Vec<ProviderEntry>>,
70}
71
72impl<FR, SR, EP, RP, LU, C, IG> HandleChatCompletion<FR, SR, EP, RP, LU, C, IG>
73where
74 FR: FactRepository,
75 SR: SessionRepository,
76 EP: EmbeddingProvider,
77 RP: RerankProvider,
78 LU: LlmUpstream,
79 C: Clock,
80 IG: IdGenerator,
81{
82 /// Run the chat-completion pipeline.
83 ///
84 /// Returns the upstream response, the session id (so the handler injects
85 /// the marker), and the memory namespace (so the adapter spawns the
86 /// extraction task against the correct project).
87 pub async fn execute(
88 &self,
89 mut request: ChatRequest,
90 ) -> Result<(ChatResponse, SessionId, MemoryKey), UseCaseError> {
91 // Step 1 — route the requested person to a (memory_key, provider,
92 // upstream_model, persona_path) tuple.
93 let route = route_request(&request.model, &self.persons, &self.providers)?;
94 let memory_key = route.memory_key;
95 request.model = route.upstream_model;
96 let provider_name = route.provider_name;
97
98 // Step 2 — inject the persona as the leading system message when the
99 // person declares one. The persona file is loaded synchronously
100 // here because: (a) persona `.md` files are tiny (<1 KB typical),
101 // (b) after the first access the page cache serves subsequent
102 // reads in microseconds, (c) an LLM proxy's request latency is
103 // dominated by the upstream round-trip (seconds), so the
104 // one-time cold-cache read on the first request is noise. The
105 // fail-soft contract (`None` on missing/unreadable file) preserves
106 // the proxy's overall availability story.
107 if let Some(persona_path) = route.persona_path
108 && let Some(persona_content) = load_persona_at(&persona_path)
109 {
110 inject_persona_into_messages(&mut request.messages, &persona_content);
111 }
112
113 // H-5: build a *read-only* typed projection of the messages for
114 // the helpers that need to introspect message content (session
115 // marker detection here, topic extraction inside EnrichRequest).
116 // The projection is NEVER re-serialised back into
117 // `request.messages` — the raw `Vec<Value>` stays the source of
118 // truth so per-message fields the typed DTO does not model
119 // (`name`, `tool_call_id`, `refusal`, `image_url` parts, audio
120 // parts, future OpenAI extensions) survive the enrichment
121 // pipeline verbatim. The previous round-trip design lost those
122 // fields and broke the fail-open contract for tool-calling and
123 // vision workflows.
124 let typed_projection = enrichment_messages_from_json(&request.messages);
125
126 // Step 3 — detect session from the typed projection. Falls back
127 // to a freshly-minted id from the injected generator when no
128 // marker survived in the trailing window. Generation goes
129 // through the `IdGenerator` port so the domain's
130 // `SessionId::new()` constructor stays `pub(crate)` and id
131 // generation is an explicit, mockable capability.
132 let session_id = session_marker::detect_from_typed_messages(&typed_projection)
133 .unwrap_or_else(|| self.id_generator.new_session_id());
134
135 // Step 4 — enrichment. Fail-open for embedder / vector-search / dedup
136 // (those return the original messages on `Ok`); fail-closed for the
137 // reranker (propagates as `Err(UseCaseError::Provider(_))` → HTTP
138 // 503). `std::mem::take` is safe because every `Ok` path returns at
139 // least the original messages — no `Vec::new()` replacement risk.
140 // The `?` propagates ONLY the reranker error to the handler; every
141 // other failure already fail-opened inside `execute`.
142 let enriched_messages = self
143 .enrich(
144 std::mem::take(&mut request.messages),
145 &memory_key,
146 &session_id,
147 )
148 .await?;
149 request.messages = enriched_messages;
150
151 // Step 5 — forward.
152 let response = self.upstream.complete(&provider_name, request).await?;
153 Ok((response, session_id, memory_key))
154 }
155
156 /// Run `EnrichRequest` and propagate its `Result`. The only `Err` path
157 /// is the reranker ([`UseCaseError::Provider`]); every other port-level
158 /// failure already fail-opened inside `execute` and returned the
159 /// original messages. The wrapper exists only to keep `execute`
160 /// readable.
161 async fn enrich(
162 &self,
163 messages: Vec<Value>,
164 memory_key: &MemoryKey,
165 session_id: &SessionId,
166 ) -> Result<Vec<Value>, UseCaseError> {
167 let enrich = EnrichRequest {
168 facts: &self.facts,
169 sessions: &self.sessions,
170 embedder: &self.embedder,
171 reranker: &self.reranker,
172 clock: &self.clock,
173 retrieval_cfg: &self.retrieval_cfg,
174 heat_cfg: &self.heat_cfg,
175 };
176 enrich.execute(messages, memory_key, session_id).await
177 }
178}
179
180/// Extract the assistant content + structured tool calls from an OpenAI-shaped
181/// non-streaming response so the extraction pipeline can reason over both.
182///
183/// `arguments` arrives as a JSON **string** on the wire (OpenAI quirk); the
184/// domain stores it verbatim as an opaque [`ToolArguments`] — parsing is
185/// deferred to the adapter layer that actually needs to interpret the
186/// payload. Exported so the adapter can run the same parsing on the buffered
187/// non-streaming body before spawning the background extraction task.
188pub fn extract_response_payload(value: &Value) -> (String, Vec<ToolCall>) {
189 let content = value
190 .pointer("/choices/0/message/content")
191 .and_then(Value::as_str)
192 .unwrap_or("")
193 .to_string();
194 let tool_calls = value
195 .pointer("/choices/0/message/tool_calls")
196 .and_then(Value::as_array)
197 .map(|arr| arr.iter().filter_map(parse_openai_tool_call).collect())
198 .unwrap_or_default();
199 (content, tool_calls)
200}
201
202/// Convert one OpenAI tool-call object (`{id, type, function:{name, arguments}}`)
203/// into the domain [`ToolCall`] shape.
204///
205/// `arguments` is normalised to a JSON-shaped string so the opaque
206/// [`ToolArguments`] is always text — the OpenAI string form is forwarded
207/// verbatim; an actual JSON object (some servers send that) is re-serialised;
208/// a missing field degrades to `"null"`.
209fn parse_openai_tool_call(v: &Value) -> Option<ToolCall> {
210 let function = v.get("function")?;
211 let name = function.get("name")?.as_str()?.to_string();
212 let arguments = match function.get("arguments") {
213 Some(Value::String(raw)) => raw.clone(),
214 Some(other) => serde_json::to_string(other).unwrap_or_else(|_| "null".to_string()),
215 None => "null".to_string(),
216 };
217 Some(ToolCall {
218 name,
219 arguments: ToolArguments::from_json(arguments),
220 })
221}