lash_core/runtime/session_manager/
direct.rs1use super::*;
2
3#[derive(Clone)]
8struct RuntimeDirectSource<'run> {
9 manager: Arc<RuntimeSessionServices>,
10 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
11 turn_id: Option<String>,
12}
13
14#[cfg(any(test, feature = "testing"))]
15type TestDirectFn = Arc<
16 dyn Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
17 + Send
18 + Sync,
19>;
20
21#[derive(Clone)]
27enum DirectCompletionSource<'run> {
28 Runtime(RuntimeDirectSource<'run>),
29 #[cfg(any(test, feature = "testing"))]
30 Unavailable(String),
31 #[cfg(any(test, feature = "testing"))]
32 TestFn(TestDirectFn),
33}
34
35#[derive(Clone)]
36pub struct DirectCompletionClient<'run> {
37 source: DirectCompletionSource<'run>,
38}
39
40impl<'run> DirectCompletionClient<'run> {
41 pub(super) fn runtime(
42 manager: Arc<RuntimeSessionServices>,
43 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
44 turn_id: Option<String>,
45 ) -> Self {
46 Self {
47 source: DirectCompletionSource::Runtime(RuntimeDirectSource {
48 manager,
49 effect_controller,
50 turn_id,
51 }),
52 }
53 }
54
55 pub async fn direct_completion(
56 &self,
57 request: crate::DirectRequest,
58 usage_source: &str,
59 ) -> Result<crate::DirectCompletion, crate::PluginError> {
60 match &self.source {
61 DirectCompletionSource::Runtime(source) => {
62 source
63 .manager
64 .direct
65 .invoke_direct_completion(source.invocation_context(), request, usage_source)
66 .await
67 }
68 #[cfg(any(test, feature = "testing"))]
69 DirectCompletionSource::Unavailable(message) => {
70 Err(crate::PluginError::Session(message.clone()))
71 }
72 #[cfg(any(test, feature = "testing"))]
73 DirectCompletionSource::TestFn(invoke) => invoke(request, usage_source.to_string()),
74 }
75 }
76
77 pub async fn direct_llm_completion(
78 &self,
79 request: crate::LlmRequest,
80 usage_source: &str,
81 ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
82 match &self.source {
83 DirectCompletionSource::Runtime(source) => {
84 source
85 .manager
86 .direct
87 .invoke_direct_llm_completion(
88 source.invocation_context(),
89 request,
90 usage_source,
91 )
92 .await
93 }
94 #[cfg(any(test, feature = "testing"))]
95 DirectCompletionSource::Unavailable(message) => {
96 Err(crate::PluginError::Session(message.clone()))
97 }
98 #[cfg(any(test, feature = "testing"))]
99 DirectCompletionSource::TestFn(_) => Err(crate::PluginError::Session(
100 "direct LLM completions are unavailable in this test context".to_string(),
101 )),
102 }
103 }
104
105 #[cfg(any(test, feature = "testing"))]
106 pub(crate) fn unavailable(message: impl Into<String>) -> Self {
107 Self {
108 source: DirectCompletionSource::Unavailable(message.into()),
109 }
110 }
111
112 #[cfg(any(test, feature = "testing"))]
113 pub fn from_fn<F>(invoke: F) -> Self
114 where
115 F: Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
116 + Send
117 + Sync
118 + 'static,
119 {
120 Self {
121 source: DirectCompletionSource::TestFn(Arc::new(invoke)),
122 }
123 }
124}
125
126impl<'run> RuntimeDirectSource<'run> {
127 fn invocation_context(&self) -> DirectInvocationContext<'_> {
128 DirectInvocationContext {
129 current: &self.manager.current,
130 usage_capability: &self.manager.usage,
131 effect_controller: self.effect_controller.controller(),
132 turn_id: self.turn_id.as_deref(),
133 }
134 }
135}
136
137pub(in crate::runtime::session_manager) struct DirectInvocationContext<'a> {
138 current: &'a CurrentSessionCapability,
139 usage_capability: &'a UsageCapability,
140 effect_controller: &'a dyn crate::RuntimeEffectController,
141 turn_id: Option<&'a str>,
142}
143
144struct DirectEffectPlan {
145 provider: crate::ProviderHandle,
146 envelope: crate::RuntimeEffectEnvelope,
147 request: Box<crate::LlmRequest>,
148 usage_source: String,
149}
150
151impl DirectCompletionCapability {
152 async fn plan_direct_effect(
158 &self,
159 context: &DirectInvocationContext<'_>,
160 provider: crate::ProviderHandle,
161 request: crate::LlmRequest,
162 usage_source: &str,
163 replay: Option<&crate::RuntimeReplay>,
164 caused_by: Option<&crate::CausalRef>,
165 ) -> Result<DirectEffectPlan, crate::PluginError> {
166 let current = context.current;
167 let usage_source = usage_source.to_string();
168 let request_spec = crate::LlmRequestSpec::from_request(
169 &request,
170 current.host.core.durability.attachment_store.as_ref(),
171 )
172 .await?;
173 let discriminator =
174 crate::runtime::causal::direct_request_discriminator(&request_spec, replay, caused_by)?;
175 let invocation = crate::runtime::causal::direct_effect_invocation(
176 ¤t.session_id,
177 &usage_source,
178 discriminator,
179 context.turn_id,
180 caused_by.cloned(),
181 );
182 let envelope = crate::RuntimeEffectEnvelope::new(
183 invocation,
184 crate::RuntimeEffectCommand::Direct {
185 request: Box::new(request_spec),
186 usage_source: usage_source.clone(),
187 },
188 );
189 Ok(DirectEffectPlan {
190 provider,
191 envelope,
192 request: Box::new(request),
193 usage_source,
194 })
195 }
196
197 async fn run_direct_effect(
200 &self,
201 context: DirectInvocationContext<'_>,
202 plan: DirectEffectPlan,
203 caused_by: Option<crate::CausalRef>,
204 ) -> Result<(crate::LlmResponse, crate::TokenUsage), crate::PluginError> {
205 let current = context.current;
206 let DirectEffectPlan {
207 provider,
208 envelope,
209 request,
210 usage_source,
211 } = plan;
212 let outcome = context
213 .effect_controller
214 .execute_effect(
215 envelope,
216 crate::RuntimeEffectLocalExecutor::direct(
217 provider,
218 Arc::clone(¤t.host.core.durability.attachment_store),
219 ),
220 )
221 .await?;
222 crate::runtime::effect::apply_direct_outcome(
223 current,
224 context.usage_capability,
225 &request,
226 &usage_source,
227 caused_by.as_ref(),
228 outcome,
229 )
230 .await
231 }
232
233 pub(in crate::runtime::session_manager) async fn invoke_direct_completion(
234 &self,
235 context: DirectInvocationContext<'_>,
236 request: crate::DirectRequest,
237 usage_source: &str,
238 ) -> Result<crate::DirectCompletion, crate::PluginError> {
239 let resolved = context.current.resolve_policy()?;
240 let provider = resolved.provider().clone();
241 let model = request.model.clone();
242 if let Some(variant) = request.model_variant.as_deref() {
243 provider
244 .validate_variant(&model, variant)
245 .map_err(crate::PluginError::Session)?;
246 }
247 let replay = request.replay.clone();
248 let caused_by = request.caused_by.clone();
249 let normalized = crate::direct::build_llm_request(&provider, request, model);
250 let plan = self
251 .plan_direct_effect(
252 &context,
253 provider,
254 normalized,
255 usage_source,
256 replay.as_ref(),
257 caused_by.as_ref(),
258 )
259 .await?;
260 let (response, usage) = self.run_direct_effect(context, plan, caused_by).await?;
261 Ok(crate::DirectCompletion {
262 text: response.full_text,
263 usage,
264 })
265 }
266
267 pub(in crate::runtime::session_manager) async fn invoke_direct_llm_completion(
268 &self,
269 context: DirectInvocationContext<'_>,
270 request: crate::LlmRequest,
271 usage_source: &str,
272 ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
273 let resolved = context.current.resolve_policy()?;
274 let plan = self
275 .plan_direct_effect(
276 &context,
277 resolved.binding.provider,
278 request,
279 usage_source,
280 None,
281 None,
282 )
283 .await?;
284 let (response, usage) = self.run_direct_effect(context, plan, None).await?;
285 Ok(crate::DirectLlmCompletion { response, usage })
286 }
287}