Skip to main content

lash_core/runtime/session_manager/
direct.rs

1use super::*;
2
3/// Runtime-backed direct completion source.
4///
5/// Carries everything needed to plan and journal a direct LLM effect against
6/// the owning session manager.
7#[derive(Clone)]
8struct RuntimeDirectSource<'run> {
9    manager: Arc<RuntimeSessionServices>,
10    effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
11    turn_id: Option<String>,
12}
13
14#[cfg(any(test, feature = "testing"))]
15type TestDirectFn = Arc<
16    dyn Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
17        + Send
18        + Sync,
19>;
20
21/// Source of direct (single-shot) LLM completions for plugins and tools.
22///
23/// In production this is always backed by the runtime session manager; the
24/// test/testing variants exist only so that out-of-runtime test harnesses can
25/// inject a canned completion without standing up a full runtime.
26#[derive(Clone)]
27enum DirectCompletionSource<'run> {
28    Runtime(RuntimeDirectSource<'run>),
29    #[cfg(any(test, feature = "testing"))]
30    Unavailable(String),
31    #[cfg(any(test, feature = "testing"))]
32    TestFn(TestDirectFn),
33}
34
35#[derive(Clone)]
36pub struct DirectCompletionClient<'run> {
37    source: DirectCompletionSource<'run>,
38}
39
40impl<'run> DirectCompletionClient<'run> {
41    pub(super) fn runtime(
42        manager: Arc<RuntimeSessionServices>,
43        effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
44        turn_id: Option<String>,
45    ) -> Self {
46        Self {
47            source: DirectCompletionSource::Runtime(RuntimeDirectSource {
48                manager,
49                effect_controller,
50                turn_id,
51            }),
52        }
53    }
54
55    pub async fn direct_completion(
56        &self,
57        request: crate::DirectRequest,
58        usage_source: &str,
59    ) -> Result<crate::DirectCompletion, crate::PluginError> {
60        match &self.source {
61            DirectCompletionSource::Runtime(source) => {
62                source
63                    .manager
64                    .direct
65                    .invoke_direct_completion(source.invocation_context(), request, usage_source)
66                    .await
67            }
68            #[cfg(any(test, feature = "testing"))]
69            DirectCompletionSource::Unavailable(message) => {
70                Err(crate::PluginError::Session(message.clone()))
71            }
72            #[cfg(any(test, feature = "testing"))]
73            DirectCompletionSource::TestFn(invoke) => invoke(request, usage_source.to_string()),
74        }
75    }
76
77    pub async fn direct_llm_completion(
78        &self,
79        request: crate::LlmRequest,
80        usage_source: &str,
81    ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
82        match &self.source {
83            DirectCompletionSource::Runtime(source) => {
84                source
85                    .manager
86                    .direct
87                    .invoke_direct_llm_completion(
88                        source.invocation_context(),
89                        request,
90                        usage_source,
91                    )
92                    .await
93            }
94            #[cfg(any(test, feature = "testing"))]
95            DirectCompletionSource::Unavailable(message) => {
96                Err(crate::PluginError::Session(message.clone()))
97            }
98            #[cfg(any(test, feature = "testing"))]
99            DirectCompletionSource::TestFn(_) => Err(crate::PluginError::Session(
100                "direct LLM completions are unavailable in this test context".to_string(),
101            )),
102        }
103    }
104
105    #[cfg(any(test, feature = "testing"))]
106    pub(crate) fn unavailable(message: impl Into<String>) -> Self {
107        Self {
108            source: DirectCompletionSource::Unavailable(message.into()),
109        }
110    }
111
112    #[cfg(any(test, feature = "testing"))]
113    pub fn from_fn<F>(invoke: F) -> Self
114    where
115        F: Fn(crate::DirectRequest, String) -> Result<crate::DirectCompletion, crate::PluginError>
116            + Send
117            + Sync
118            + 'static,
119    {
120        Self {
121            source: DirectCompletionSource::TestFn(Arc::new(invoke)),
122        }
123    }
124}
125
126impl<'run> RuntimeDirectSource<'run> {
127    fn invocation_context(&self) -> DirectInvocationContext<'_> {
128        DirectInvocationContext {
129            current: &self.manager.current,
130            usage_capability: &self.manager.usage,
131            effect_controller: self.effect_controller.controller(),
132            turn_id: self.turn_id.as_deref(),
133        }
134    }
135}
136
137pub(in crate::runtime::session_manager) struct DirectInvocationContext<'a> {
138    current: &'a CurrentSessionCapability,
139    usage_capability: &'a UsageCapability,
140    effect_controller: &'a dyn crate::RuntimeEffectController,
141    turn_id: Option<&'a str>,
142}
143
144struct DirectEffectPlan {
145    provider: crate::ProviderHandle,
146    envelope: crate::RuntimeEffectEnvelope,
147    request: Box<crate::LlmRequest>,
148    usage_source: String,
149}
150
151impl DirectCompletionCapability {
152    /// Plans a single direct LLM effect from a normalized [`crate::LlmRequest`].
153    ///
154    /// Both the text-only (`DirectRequest`) and full-output entry points feed
155    /// the same effect lane; they differ only in how the caller projects the
156    /// resulting [`crate::LlmResponse`].
157    async fn plan_direct_effect(
158        &self,
159        context: &DirectInvocationContext<'_>,
160        provider: crate::ProviderHandle,
161        request: crate::LlmRequest,
162        usage_source: &str,
163        replay: Option<&crate::RuntimeReplay>,
164        caused_by: Option<&crate::CausalRef>,
165    ) -> Result<DirectEffectPlan, crate::PluginError> {
166        let current = context.current;
167        let usage_source = usage_source.to_string();
168        let request_spec = crate::LlmRequestSpec::from_request(
169            &request,
170            current.host.core.durability.attachment_store.as_ref(),
171        )
172        .await?;
173        let discriminator =
174            crate::runtime::causal::direct_request_discriminator(&request_spec, replay, caused_by)?;
175        let invocation = crate::runtime::causal::direct_effect_invocation(
176            &current.session_id,
177            &usage_source,
178            discriminator,
179            context.turn_id,
180            caused_by.cloned(),
181        );
182        let envelope = crate::RuntimeEffectEnvelope::new(
183            invocation,
184            crate::RuntimeEffectCommand::Direct {
185                request: Box::new(request_spec),
186                usage_source: usage_source.clone(),
187            },
188        );
189        Ok(DirectEffectPlan {
190            provider,
191            envelope,
192            request: Box::new(request),
193            usage_source,
194        })
195    }
196
197    /// Runs a planned direct effect across the journal/controller boundary and
198    /// applies usage/trace bookkeeping, yielding the raw provider response.
199    async fn run_direct_effect(
200        &self,
201        context: DirectInvocationContext<'_>,
202        plan: DirectEffectPlan,
203        caused_by: Option<crate::CausalRef>,
204    ) -> Result<(crate::LlmResponse, crate::TokenUsage), crate::PluginError> {
205        let current = context.current;
206        let DirectEffectPlan {
207            provider,
208            envelope,
209            request,
210            usage_source,
211        } = plan;
212        let outcome = context
213            .effect_controller
214            .execute_effect(
215                envelope,
216                crate::RuntimeEffectLocalExecutor::direct(
217                    provider,
218                    Arc::clone(&current.host.core.durability.attachment_store),
219                ),
220            )
221            .await?;
222        crate::runtime::effect::apply_direct_outcome(
223            current,
224            context.usage_capability,
225            &request,
226            &usage_source,
227            caused_by.as_ref(),
228            outcome,
229        )
230        .await
231    }
232
233    pub(in crate::runtime::session_manager) async fn invoke_direct_completion(
234        &self,
235        context: DirectInvocationContext<'_>,
236        request: crate::DirectRequest,
237        usage_source: &str,
238    ) -> Result<crate::DirectCompletion, crate::PluginError> {
239        let resolved = context.current.resolve_policy()?;
240        let provider = resolved.provider().clone();
241        let model = request.model.clone();
242        if let Some(variant) = request.model_variant.as_deref() {
243            provider
244                .validate_variant(&model, variant)
245                .map_err(crate::PluginError::Session)?;
246        }
247        let replay = request.replay.clone();
248        let caused_by = request.caused_by.clone();
249        let normalized = crate::direct::build_llm_request(&provider, request, model);
250        let plan = self
251            .plan_direct_effect(
252                &context,
253                provider,
254                normalized,
255                usage_source,
256                replay.as_ref(),
257                caused_by.as_ref(),
258            )
259            .await?;
260        let (response, usage) = self.run_direct_effect(context, plan, caused_by).await?;
261        Ok(crate::DirectCompletion {
262            text: response.full_text,
263            usage,
264        })
265    }
266
267    pub(in crate::runtime::session_manager) async fn invoke_direct_llm_completion(
268        &self,
269        context: DirectInvocationContext<'_>,
270        request: crate::LlmRequest,
271        usage_source: &str,
272    ) -> Result<crate::DirectLlmCompletion, crate::PluginError> {
273        let resolved = context.current.resolve_policy()?;
274        let plan = self
275            .plan_direct_effect(
276                &context,
277                resolved.binding.provider,
278                request,
279                usage_source,
280                None,
281                None,
282            )
283            .await?;
284        let (response, usage) = self.run_direct_effect(context, plan, None).await?;
285        Ok(crate::DirectLlmCompletion { response, usage })
286    }
287}