Skip to main content

lash_core/runtime/session_manager/
direct.rs

1use super::*;
2
3/// Runtime-backed direct completion source.
4///
5/// Carries everything needed to plan and journal a direct LLM effect against
6/// the owning session manager.
7#[derive(Clone)]
8struct RuntimeDirectSource<'run> {
9    manager: Arc<RuntimeSessionServices>,
10    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
11    turn_id: Option<String>,
12}
13
14#[cfg(any(test, feature = "testing"))]
15type TestDirectFn = Arc<
16    dyn Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
17        + Send
18        + Sync,
19>;
20
21/// Source of direct (single-shot) LLM completions for plugins and tools.
22///
23/// In production this is always backed by the runtime session manager; the
24/// test/testing variants exist only so that out-of-runtime test harnesses can
25/// inject a canned completion without standing up a full runtime.
26#[derive(Clone)]
27enum DirectCompletionSource<'run> {
28    Runtime(RuntimeDirectSource<'run>),
29    #[cfg(any(test, feature = "testing"))]
30    Unavailable(String),
31    #[cfg(any(test, feature = "testing"))]
32    TestFn(TestDirectFn),
33}
34
35#[derive(Clone)]
36pub struct DirectCompletionClient<'run> {
37    source: DirectCompletionSource<'run>,
38}
39
40impl<'run> DirectCompletionClient<'run> {
41    pub(super) fn runtime(
42        manager: Arc<RuntimeSessionServices>,
43        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
44        turn_id: Option<String>,
45    ) -> Self {
46        Self {
47            source: DirectCompletionSource::Runtime(RuntimeDirectSource {
48                manager,
49                effect_controller,
50                turn_id,
51            }),
52        }
53    }
54
55    pub async fn direct_completion(
56        &self,
57        request: crate::DirectRequest,
58        usage_source: &str,
59    ) -> Result<crate::DirectCompletion, crate::PluginError> {
60        match &self.source {
61            DirectCompletionSource::Runtime(source) => {
62                source
63                    .manager
64                    .direct
65                    .invoke_direct_completion(source.invocation_context(), request, usage_source)
66                    .await
67            }
68            #[cfg(any(test, feature = "testing"))]
69            DirectCompletionSource::Unavailable(message) => {
70                Err(crate::PluginError::Session(message.clone()))
71            }
72            #[cfg(any(test, feature = "testing"))]
73            DirectCompletionSource::TestFn(invoke) => invoke(request, usage_source.to_string()),
74        }
75    }
76
77    pub async fn direct_llm_completion(
78        &self,
79        request: crate::LlmRequest,
80        usage_source: &str,
81    ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
82        match &self.source {
83            DirectCompletionSource::Runtime(source) => {
84                source
85                    .manager
86                    .direct
87                    .invoke_direct_llm_completion(
88                        source.invocation_context(),
89                        request,
90                        usage_source,
91                    )
92                    .await
93            }
94            #[cfg(any(test, feature = "testing"))]
95            DirectCompletionSource::Unavailable(message) => {
96                Err(crate::PluginError::Session(message.clone()))
97            }
98            #[cfg(any(test, feature = "testing"))]
99            DirectCompletionSource::TestFn(_) => Err(crate::PluginError::Session(
100                "direct LLM completions are unavailable in this test context".to_string(),
101            )),
102        }
103    }
104
105    #[cfg(any(test, feature = "testing"))]
106    pub(crate) fn unavailable(message: impl Into<String>) -> Self {
107        Self {
108            source: DirectCompletionSource::Unavailable(message.into()),
109        }
110    }
111
112    #[cfg(any(test, feature = "testing"))]
113    pub fn from_fn<F>(invoke: F) -> Self
114    where
115        F: Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
116            + Send
117            + Sync
118            + 'static,
119    {
120        Self {
121            source: DirectCompletionSource::TestFn(Arc::new(invoke)),
122        }
123    }
124}
125
126impl<'run> RuntimeDirectSource<'run> {
127    fn invocation_context(&self) -> DirectInvocationContext<'_> {
128        DirectInvocationContext {
129            current: &self.manager.current,
130            usage_capability: &self.manager.usage,
131            effect_controller: self.effect_controller.controller(),
132            turn_id: self.turn_id.as_deref(),
133        }
134    }
135}
136
137pub(in crate::runtime::session_manager) struct DirectInvocationContext<'a> {
138    current: &'a CurrentSessionCapability,
139    usage_capability: &'a UsageCapability,
140    effect_controller: &'a dyn crate::RuntimeEffectController,
141    turn_id: Option<&'a str>,
142}
143
144struct DirectEffectPlan {
145    provider: crate::ProviderHandle,
146    envelope: crate::RuntimeEffectEnvelope,
147    request: Box<crate::LlmRequest>,
148    usage_source: String,
149}
150
151impl DirectCompletionCapability {
152    /// Plans a single direct LLM effect from a normalized [`crate::LlmRequest`].
153    ///
154    /// Both the text-only (`DirectRequest`) and full-output entry points feed
155    /// the same effect lane; they differ only in how the caller projects the
156    /// resulting [`crate::LlmResponse`].
157    fn plan_direct_effect(
158        &self,
159        context: &DirectInvocationContext<'_>,
160        provider: crate::ProviderHandle,
161        request: crate::LlmRequest,
162        usage_source: &str,
163        replay: Option<&crate::RuntimeReplay>,
164        caused_by: Option<&crate::CausalRef>,
165    ) -> Result<DirectEffectPlan, crate::PluginError> {
166        let current = context.current;
167        let usage_source = usage_source.to_string();
168        let request_spec = crate::LlmRequestSpec::from_request(
169            &request,
170            current.host.core.durability.attachment_store.as_ref(),
171        )?;
172        let discriminator =
173            crate::runtime::causal::direct_request_discriminator(&request_spec, replay, caused_by)?;
174        let invocation = crate::runtime::causal::direct_effect_invocation(
175            &current.session_id,
176            &usage_source,
177            discriminator,
178            context.turn_id,
179            caused_by.cloned(),
180        );
181        let envelope = crate::RuntimeEffectEnvelope::new(
182            invocation,
183            crate::RuntimeEffectCommand::Direct {
184                request: Box::new(request_spec),
185                usage_source: usage_source.clone(),
186            },
187        );
188        Ok(DirectEffectPlan {
189            provider,
190            envelope,
191            request: Box::new(request),
192            usage_source,
193        })
194    }
195
196    /// Runs a planned direct effect across the journal/controller boundary and
197    /// applies usage/trace bookkeeping, yielding the raw provider response.
198    async fn run_direct_effect(
199        &self,
200        context: DirectInvocationContext<'_>,
201        plan: DirectEffectPlan,
202        caused_by: Option<crate::CausalRef>,
203    ) -> Result<(crate::LlmResponse, crate::TokenUsage), crate::PluginError> {
204        let current = context.current;
205        let DirectEffectPlan {
206            provider,
207            envelope,
208            request,
209            usage_source,
210        } = plan;
211        let outcome = context
212            .effect_controller
213            .execute_effect(
214                envelope,
215                crate::RuntimeEffectLocalExecutor::direct(
216                    provider,
217                    Arc::clone(&current.host.core.durability.attachment_store),
218                ),
219            )
220            .await?;
221        crate::runtime::effect::apply_direct_outcome(
222            current,
223            context.usage_capability,
224            &request,
225            &usage_source,
226            caused_by.as_ref(),
227            outcome,
228        )
229        .await
230    }
231
232    pub(in crate::runtime::session_manager) async fn invoke_direct_completion(
233        &self,
234        context: DirectInvocationContext<'_>,
235        request: crate::DirectRequest,
236        usage_source: &str,
237    ) -> Result<crate::DirectCompletion, crate::PluginError> {
238        let resolved = context.current.resolve_policy()?;
239        let provider = resolved.provider().clone();
240        let model = request.model.clone();
241        if let Some(variant) = request.model_variant.as_deref() {
242            provider
243                .validate_variant(&model, variant)
244                .map_err(crate::PluginError::Session)?;
245        }
246        let replay = request.replay.clone();
247        let caused_by = request.caused_by.clone();
248        let normalized = crate::direct::build_llm_request(&provider, request, model);
249        let plan = self.plan_direct_effect(
250            &context,
251            provider,
252            normalized,
253            usage_source,
254            replay.as_ref(),
255            caused_by.as_ref(),
256        )?;
257        let (response, usage) = self.run_direct_effect(context, plan, caused_by).await?;
258        Ok(crate::DirectCompletion {
259            text: response.full_text,
260            usage,
261        })
262    }
263
264    pub(in crate::runtime::session_manager) async fn invoke_direct_llm_completion(
265        &self,
266        context: DirectInvocationContext<'_>,
267        request: crate::LlmRequest,
268        usage_source: &str,
269    ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
270        let resolved = context.current.resolve_policy()?;
271        let plan = self.plan_direct_effect(
272            &context,
273            resolved.binding.provider,
274            request,
275            usage_source,
276            None,
277            None,
278        )?;
279        let (response, usage) = self.run_direct_effect(context, plan, None).await?;
280        Ok(crate::DirectLlmCompletion { response, usage })
281    }
282}