lash_core/runtime/session_manager/
direct.rs1use super::*;
2
3#[derive(Clone)]
8struct RuntimeDirectSource<'run> {
9 manager: Arc<RuntimeSessionServices>,
10 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
11 turn_id: Option<String>,
12}
13
14#[cfg(any(test, feature = "testing"))]
15type TestDirectFn = Arc<
16 dyn Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
17 + Send
18 + Sync,
19>;
20
21#[derive(Clone)]
27enum DirectCompletionSource<'run> {
28 Runtime(RuntimeDirectSource<'run>),
29 #[cfg(any(test, feature = "testing"))]
30 Unavailable(String),
31 #[cfg(any(test, feature = "testing"))]
32 TestFn(TestDirectFn),
33}
34
35#[derive(Clone)]
36pub struct DirectCompletionClient<'run> {
37 source: DirectCompletionSource<'run>,
38}
39
40impl<'run> DirectCompletionClient<'run> {
41 pub(super) fn runtime(
42 manager: Arc<RuntimeSessionServices>,
43 effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
44 turn_id: Option<String>,
45 ) -> Self {
46 Self {
47 source: DirectCompletionSource::Runtime(RuntimeDirectSource {
48 manager,
49 effect_controller,
50 turn_id,
51 }),
52 }
53 }
54
55 pub async fn direct_completion(
56 &self,
57 request: crate::DirectRequest,
58 usage_source: &str,
59 ) -> Result<crate::DirectCompletion, crate::PluginError> {
60 match &self.source {
61 DirectCompletionSource::Runtime(source) => {
62 source
63 .manager
64 .direct
65 .invoke_direct_completion(source.invocation_context(), request, usage_source)
66 .await
67 }
68 #[cfg(any(test, feature = "testing"))]
69 DirectCompletionSource::Unavailable(message) => {
70 Err(crate::PluginError::Session(message.clone()))
71 }
72 #[cfg(any(test, feature = "testing"))]
73 DirectCompletionSource::TestFn(invoke) => invoke(request, usage_source.to_string()),
74 }
75 }
76
77 pub async fn direct_llm_completion(
78 &self,
79 request: crate::LlmRequest,
80 usage_source: &str,
81 ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
82 match &self.source {
83 DirectCompletionSource::Runtime(source) => {
84 source
85 .manager
86 .direct
87 .invoke_direct_llm_completion(
88 source.invocation_context(),
89 request,
90 usage_source,
91 )
92 .await
93 }
94 #[cfg(any(test, feature = "testing"))]
95 DirectCompletionSource::Unavailable(message) => {
96 Err(crate::PluginError::Session(message.clone()))
97 }
98 #[cfg(any(test, feature = "testing"))]
99 DirectCompletionSource::TestFn(_) => Err(crate::PluginError::Session(
100 "direct LLM completions are unavailable in this test context".to_string(),
101 )),
102 }
103 }
104
105 #[cfg(any(test, feature = "testing"))]
106 pub(crate) fn unavailable(message: impl Into<String>) -> Self {
107 Self {
108 source: DirectCompletionSource::Unavailable(message.into()),
109 }
110 }
111
112 #[cfg(any(test, feature = "testing"))]
113 pub fn from_fn<F>(invoke: F) -> Self
114 where
115 F: Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
116 + Send
117 + Sync
118 + 'static,
119 {
120 Self {
121 source: DirectCompletionSource::TestFn(Arc::new(invoke)),
122 }
123 }
124}
125
126impl<'run> RuntimeDirectSource<'run> {
127 fn invocation_context(&self) -> DirectInvocationContext<'_> {
128 DirectInvocationContext {
129 current: &self.manager.current,
130 usage_capability: &self.manager.usage,
131 effect_controller: self.effect_controller.controller(),
132 turn_id: self.turn_id.as_deref(),
133 }
134 }
135}
136
137pub(in crate::runtime::session_manager) struct DirectInvocationContext<'a> {
138 current: &'a CurrentSessionCapability,
139 usage_capability: &'a UsageCapability,
140 effect_controller: &'a dyn crate::RuntimeEffectController,
141 turn_id: Option<&'a str>,
142}
143
144struct DirectEffectPlan {
145 provider: crate::ProviderHandle,
146 envelope: crate::RuntimeEffectEnvelope,
147 request: Box<crate::LlmRequest>,
148 usage_source: String,
149}
150
151impl DirectCompletionCapability {
152 fn plan_direct_effect(
158 &self,
159 context: &DirectInvocationContext<'_>,
160 provider: crate::ProviderHandle,
161 request: crate::LlmRequest,
162 usage_source: &str,
163 replay: Option<&crate::RuntimeReplay>,
164 caused_by: Option<&crate::CausalRef>,
165 ) -> Result<DirectEffectPlan, crate::PluginError> {
166 let current = context.current;
167 let usage_source = usage_source.to_string();
168 let request_spec = crate::LlmRequestSpec::from_request(
169 &request,
170 current.host.core.durability.attachment_store.as_ref(),
171 )?;
172 let discriminator =
173 crate::runtime::causal::direct_request_discriminator(&request_spec, replay, caused_by)?;
174 let invocation = crate::runtime::causal::direct_effect_invocation(
175 ¤t.session_id,
176 &usage_source,
177 discriminator,
178 context.turn_id,
179 caused_by.cloned(),
180 );
181 let envelope = crate::RuntimeEffectEnvelope::new(
182 invocation,
183 crate::RuntimeEffectCommand::Direct {
184 request: Box::new(request_spec),
185 usage_source: usage_source.clone(),
186 },
187 );
188 Ok(DirectEffectPlan {
189 provider,
190 envelope,
191 request: Box::new(request),
192 usage_source,
193 })
194 }
195
196 async fn run_direct_effect(
199 &self,
200 context: DirectInvocationContext<'_>,
201 plan: DirectEffectPlan,
202 caused_by: Option<crate::CausalRef>,
203 ) -> Result<(crate::LlmResponse, crate::TokenUsage), crate::PluginError> {
204 let current = context.current;
205 let DirectEffectPlan {
206 provider,
207 envelope,
208 request,
209 usage_source,
210 } = plan;
211 let outcome = context
212 .effect_controller
213 .execute_effect(
214 envelope,
215 crate::RuntimeEffectLocalExecutor::direct(
216 provider,
217 Arc::clone(¤t.host.core.durability.attachment_store),
218 ),
219 )
220 .await?;
221 crate::runtime::effect::apply_direct_outcome(
222 current,
223 context.usage_capability,
224 &request,
225 &usage_source,
226 caused_by.as_ref(),
227 outcome,
228 )
229 .await
230 }
231
232 pub(in crate::runtime::session_manager) async fn invoke_direct_completion(
233 &self,
234 context: DirectInvocationContext<'_>,
235 request: crate::DirectRequest,
236 usage_source: &str,
237 ) -> Result<crate::DirectCompletion, crate::PluginError> {
238 let resolved = context.current.resolve_policy()?;
239 let provider = resolved.provider().clone();
240 let model = request.model.clone();
241 if let Some(variant) = request.model_variant.as_deref() {
242 provider
243 .validate_variant(&model, variant)
244 .map_err(crate::PluginError::Session)?;
245 }
246 let replay = request.replay.clone();
247 let caused_by = request.caused_by.clone();
248 let normalized = crate::direct::build_llm_request(&provider, request, model);
249 let plan = self.plan_direct_effect(
250 &context,
251 provider,
252 normalized,
253 usage_source,
254 replay.as_ref(),
255 caused_by.as_ref(),
256 )?;
257 let (response, usage) = self.run_direct_effect(context, plan, caused_by).await?;
258 Ok(crate::DirectCompletion {
259 text: response.full_text,
260 usage,
261 })
262 }
263
264 pub(in crate::runtime::session_manager) async fn invoke_direct_llm_completion(
265 &self,
266 context: DirectInvocationContext<'_>,
267 request: crate::LlmRequest,
268 usage_source: &str,
269 ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
270 let resolved = context.current.resolve_policy()?;
271 let plan = self.plan_direct_effect(
272 &context,
273 resolved.binding.provider,
274 request,
275 usage_source,
276 None,
277 None,
278 )?;
279 let (response, usage) = self.run_direct_effect(context, plan, None).await?;
280 Ok(crate::DirectLlmCompletion { response, usage })
281 }
282}