Skip to main content

nemo_flow/api/
llm.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6
7use bitflags::bitflags;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use typed_builder::TypedBuilder;
12use uuid::Uuid;
13
14use crate::api::runtime::NemoFlowContextState;
15use crate::api::runtime::current_scope_stack;
16use crate::api::runtime::global_context;
17use crate::api::runtime::{
18    LlmCollectorFn, LlmExecutionNextFn, LlmFinalizerFn, LlmJsonStream, LlmStreamExecutionNextFn,
19};
20use crate::api::scope::event;
21use crate::api::scope::{EmitMarkEventParams, ScopeHandle};
22use crate::api::shared::{
23    ensure_runtime_owner, resolve_parent_uuid, run_request_intercepts_with_codec,
24    snapshot_event_subscribers,
25};
26use crate::codec::request::AnnotatedLlmRequest;
27use crate::codec::response::AnnotatedLlmResponse;
28use crate::codec::traits::{LlmCodec, LlmResponseCodec};
29use crate::error::{FlowError, Result};
30use crate::json::Json;
31use crate::stream::LlmStreamWrapper;
32
33bitflags! {
34    /// Bitflags that modify LLM-call behavior and observability.
35    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
36    pub struct LlmAttributes: u32 {
37        /// Marks the request as stateful from the runtime's perspective.
38        const STATEFUL = 0b01;
39        /// Marks the request as streaming.
40        const STREAMING = 0b10;
41    }
42}
43
44/// Runtime-owned handle identifying an active or completed LLM call.
45#[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)]
46#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
47pub struct LlmHandle {
48    /// Unique LLM-call identifier.
49    #[builder(default = Uuid::now_v7())]
50    pub uuid: Uuid,
51    /// Timestamp captured when the LLM handle was created.
52    #[builder(default = Utc::now())]
53    pub started_at: DateTime<Utc>,
54    /// Provider or logical call name recorded on lifecycle events.
55    #[builder(setter(into))]
56    pub name: String,
57    /// Optional application payload stored on the handle.
58    #[builder(default)]
59    pub data: Option<Json>,
60    /// Optional metadata attached to the LLM span.
61    #[builder(default)]
62    pub metadata: Option<Json>,
63    /// LLM behavior flags.
64    #[builder(default = LlmAttributes::empty())]
65    pub attributes: LlmAttributes,
66    /// UUID of the parent scope, if any.
67    #[builder(default)]
68    pub parent_uuid: Option<Uuid>,
69    /// Optional normalized model name for observability.
70    #[builder(default, setter(into))]
71    pub model_name: Option<String>,
72}
73
74/// JSON-shaped LLM request payload passed through the runtime.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct LlmRequest {
77    /// Provider-specific request headers.
78    pub headers: serde_json::Map<String, Json>,
79    /// Provider-specific request body.
80    pub content: Json,
81}
82
83/// Builder parameters for [`NemoFlowContextState::create_llm_handle`].
84#[derive(Debug, Clone, TypedBuilder)]
85#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
86pub struct CreateLlmHandleParams<'a> {
87    /// Logical provider or model family name.
88    pub name: &'a str,
89    /// Optional parent scope UUID.
90    #[builder(default)]
91    pub parent_uuid: Option<uuid::Uuid>,
92    /// LLM attribute bitflags.
93    #[builder(default = LlmAttributes::empty())]
94    pub attributes: LlmAttributes,
95    /// Optional application payload stored on the handle.
96    #[builder(default)]
97    pub data: Option<Json>,
98    /// Optional metadata stored on the handle.
99    #[builder(default)]
100    pub metadata: Option<Json>,
101    /// Optional normalized model name stored on the handle.
102    #[builder(default, setter(into))]
103    pub model_name: Option<String>,
104    /// Optional timestamp captured as the handle start time and reused by the
105    /// emitted start event. When omitted, the current UTC time is used.
106    #[builder(default)]
107    pub timestamp: Option<DateTime<Utc>>,
108}
109
110/// Builder parameters for [`NemoFlowContextState::build_llm_end_event`].
111#[derive(Clone, TypedBuilder)]
112#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
113pub struct EndLlmHandleParams<'a> {
114    /// LLM handle to serialize into the emitted end event.
115    pub handle: &'a LlmHandle,
116    /// Optional data payload merged over the handle data.
117    #[builder(default)]
118    pub data: Option<Json>,
119    /// Optional metadata payload merged over the handle metadata.
120    #[builder(default)]
121    pub metadata: Option<Json>,
122    /// Optional normalized response annotation produced by a response codec.
123    #[builder(default)]
124    pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
125    /// Optional timestamp recorded on the emitted end event. When omitted, the
126    /// runtime records the current UTC time, or one microsecond after the
127    /// handle start time if the current time is not later.
128    #[builder(default)]
129    pub timestamp: Option<DateTime<Utc>>,
130}
131
132/// Builder parameters for [`llm_call`].
133#[derive(TypedBuilder)]
134#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
135pub struct LlmCallParams<'a> {
136    /// Logical provider or model family name recorded on the span.
137    pub name: &'a str,
138    /// Raw request associated with the span.
139    pub request: &'a LlmRequest,
140    /// Optional explicit parent scope.
141    #[builder(default)]
142    pub parent: Option<&'a ScopeHandle>,
143    /// LLM attribute bitflags applied to the span.
144    #[builder(default = LlmAttributes::empty())]
145    pub attributes: LlmAttributes,
146    /// Optional application payload stored on the handle but not emitted as ATOF data.
147    #[builder(default)]
148    pub data: Option<Json>,
149    /// Optional JSON metadata recorded on the start event.
150    #[builder(default)]
151    pub metadata: Option<Json>,
152    /// Optional normalized model name recorded separately from the request payload.
153    #[builder(default, setter(into))]
154    pub model_name: Option<String>,
155    /// Optional normalized request annotation produced by a codec.
156    #[builder(default)]
157    pub annotated_request: Option<Arc<AnnotatedLlmRequest>>,
158    /// Optional timestamp captured as the handle start time and reused by the
159    /// emitted start event. When omitted, the current UTC time is used.
160    #[builder(default)]
161    pub timestamp: Option<DateTime<Utc>>,
162}
163
164/// Builder parameters for [`llm_call_execute`].
165#[derive(TypedBuilder)]
166#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
167pub struct LlmCallExecuteParams {
168    /// Logical provider or model family name recorded on emitted events.
169    #[builder(setter(into))]
170    pub name: String,
171    /// Raw request passed into the managed pipeline.
172    pub request: LlmRequest,
173    /// Provider callback or execution continuation.
174    pub func: LlmExecutionNextFn,
175    /// Optional explicit parent scope for the emitted LLM span.
176    #[builder(default)]
177    pub parent: Option<ScopeHandle>,
178    /// LLM attribute bitflags applied to the managed span.
179    #[builder(default = LlmAttributes::empty())]
180    pub attributes: LlmAttributes,
181    /// Optional application payload stored on the handle but not emitted as ATOF data.
182    #[builder(default)]
183    pub data: Option<Json>,
184    /// Optional JSON metadata recorded on emitted events.
185    #[builder(default)]
186    pub metadata: Option<Json>,
187    /// Optional normalized model name for observability output.
188    #[builder(default, setter(into))]
189    pub model_name: Option<String>,
190    /// Optional request codec used to produce annotated request data.
191    #[builder(default)]
192    pub codec: Option<Arc<dyn LlmCodec>>,
193    /// Optional response codec used to attach annotated response data.
194    #[builder(default)]
195    pub response_codec: Option<Arc<dyn LlmResponseCodec>>,
196}
197
198/// Builder parameters for [`llm_stream_call_execute`].
199#[derive(TypedBuilder)]
200#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
201pub struct LlmStreamCallExecuteParams {
202    /// Logical provider or model family name recorded on emitted events.
203    #[builder(setter(into))]
204    pub name: String,
205    /// Raw request passed into the managed pipeline.
206    pub request: LlmRequest,
207    /// Streaming provider callback or execution continuation.
208    pub func: LlmStreamExecutionNextFn,
209    /// Per-chunk collector callback used to accumulate stream state.
210    pub collector: LlmCollectorFn,
211    /// Finalizer callback used to construct the completed response.
212    pub finalizer: LlmFinalizerFn,
213    /// Optional explicit parent scope for the emitted LLM span.
214    #[builder(default)]
215    pub parent: Option<ScopeHandle>,
216    /// LLM attribute bitflags applied to the managed span.
217    #[builder(default = LlmAttributes::empty())]
218    pub attributes: LlmAttributes,
219    /// Optional application payload stored on the handle but not emitted as ATOF data.
220    #[builder(default)]
221    pub data: Option<Json>,
222    /// Optional JSON metadata recorded on emitted events.
223    #[builder(default)]
224    pub metadata: Option<Json>,
225    /// Optional normalized model name for observability output.
226    #[builder(default, setter(into))]
227    pub model_name: Option<String>,
228    /// Optional request codec used to produce annotated request data.
229    #[builder(default)]
230    pub codec: Option<Arc<dyn LlmCodec>>,
231    /// Optional response codec used to attach annotated response data.
232    #[builder(default)]
233    pub response_codec: Option<Arc<dyn LlmResponseCodec>>,
234}
235
236/// Builder parameters for [`llm_call_end`].
237#[derive(TypedBuilder)]
238#[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))]
239pub struct LlmCallEndParams<'a> {
240    /// LLM handle to close.
241    pub handle: &'a LlmHandle,
242    /// Raw provider response associated with the end event.
243    pub response: Json,
244    /// Optional application payload retained for compatibility; ATOF data is the response.
245    #[builder(default)]
246    pub data: Option<Json>,
247    /// Optional JSON metadata recorded on the end event.
248    #[builder(default)]
249    pub metadata: Option<Json>,
250    /// Optional normalized response annotation produced by a response codec.
251    #[builder(default)]
252    pub annotated_response: Option<Arc<AnnotatedLlmResponse>>,
253    /// Optional timestamp recorded on the emitted end event. When omitted, the
254    /// runtime records the current UTC time, or one microsecond after the
255    /// handle start time if the current time is not later.
256    #[builder(default)]
257    pub timestamp: Option<DateTime<Utc>>,
258}
259
260fn create_llm_handle(params: CreateLlmHandleParams<'_>) -> Result<LlmHandle> {
261    ensure_runtime_owner()?;
262    let context = global_context();
263    let state = context
264        .read()
265        .map_err(|error| FlowError::Internal(error.to_string()))?;
266    Ok(state.create_llm_handle(params))
267}
268
269fn emit_llm_start(
270    handle: &LlmHandle,
271    request: &LlmRequest,
272    annotated_request: Option<Arc<AnnotatedLlmRequest>>,
273) -> Result<()> {
274    ensure_runtime_owner()?;
275    let (event, subscribers) = {
276        let scope_stack = current_scope_stack();
277        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
278        let scope_locals = scope_guard.collect_scope_local_registries(|registries| {
279            &registries.llm_sanitize_request_guardrails
280        });
281        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
282        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
283        let context = global_context();
284        let state = context
285            .read()
286            .map_err(|error| FlowError::Internal(error.to_string()))?;
287
288        let sanitized_request = state.llm_sanitize_request_chain(request.clone(), &scope_locals);
289        let input = serde_json::to_value(&sanitized_request).unwrap_or(Json::Null);
290        let event = state.build_llm_start_event(handle, Some(input), annotated_request);
291        (event, subscribers)
292    };
293    NemoFlowContextState::emit_event(&event, &subscribers);
294    Ok(())
295}
296
297fn emit_llm_start_once(
298    start_emitted: &Arc<AtomicBool>,
299    handle: &LlmHandle,
300    request: &LlmRequest,
301    annotated_request: Option<Arc<AnnotatedLlmRequest>>,
302) -> Result<()> {
303    if start_emitted.swap(true, Ordering::SeqCst) {
304        return Ok(());
305    }
306    emit_llm_start(handle, request, annotated_request)
307}
308
309/// Start a manual LLM lifecycle span.
310///
311/// This emits an LLM-start event after applying sanitize-request guardrails to
312/// the payload recorded for observability.
313///
314/// # Parameters
315/// - `name`: Logical provider or model family name recorded on the span.
316/// - `request`: Raw [`LlmRequest`] associated with the span.
317/// - `parent`: Optional explicit parent scope.
318/// - `attributes`: LLM attribute bitflags applied to the span.
319/// - `data`: Optional application payload stored on the returned handle. The
320///   emitted start event data is the sanitized `request` payload.
321/// - `metadata`: Optional JSON metadata recorded on the start event.
322/// - `model_name`: Optional normalized model name recorded separately from the
323///   request payload.
324/// - `annotated_request`: Optional normalized request annotation produced by a
325///   codec.
326/// - `timestamp`: Optional timestamp recorded as the handle start time and on
327///   the emitted start event. When `None`, the current UTC time is used.
328///
329/// # Returns
330/// A [`Result`] containing the created [`LlmHandle`].
331///
332/// # Errors
333/// Returns an error when the runtime owner check fails or when internal state
334/// cannot be read safely.
335///
336/// # Notes
337/// Sanitize-request guardrails affect only the emitted start-event payload, not
338/// the caller-owned [`LlmRequest`].
339pub fn llm_call(params: LlmCallParams<'_>) -> Result<LlmHandle> {
340    let handle_params = CreateLlmHandleParams::builder()
341        .name(params.name)
342        .parent_uuid_opt(resolve_parent_uuid(params.parent))
343        .attributes(params.attributes)
344        .data_opt(params.data)
345        .metadata_opt(params.metadata)
346        .model_name_opt(params.model_name)
347        .timestamp_opt(params.timestamp)
348        .build();
349    let handle = create_llm_handle(handle_params)?;
350    emit_llm_start(&handle, params.request, params.annotated_request)?;
351    Ok(handle)
352}
353
354/// Finish a manual LLM lifecycle span.
355///
356/// This emits an LLM-end event for a handle previously returned by
357/// [`llm_call`].
358///
359/// # Parameters
360/// - `handle`: LLM handle to close.
361/// - `response`: Raw provider response associated with the end event.
362/// - `data`: Optional application payload retained for compatibility. The
363///   emitted end event data is the sanitized `response` unless it sanitizes to
364///   JSON null, in which case this payload is used.
365/// - `metadata`: Optional JSON metadata recorded on the end event.
366/// - `annotated_response`: Optional normalized response annotation produced by
367///   a response codec.
368/// - `timestamp`: Optional timestamp recorded on the emitted end event. When
369///   `None`, the runtime uses the current UTC time, or one microsecond after
370///   the handle start time if the current time is not later.
371///
372/// # Returns
373/// A [`Result`] that is `Ok(())` when the end event has been emitted.
374///
375/// # Errors
376/// Returns an error when the runtime owner check fails or when internal state
377/// cannot be read safely.
378///
379/// # Notes
380/// Sanitize-response guardrails affect only the emitted end-event payload, not
381/// the caller-owned `response` value.
382pub fn llm_call_end(params: LlmCallEndParams<'_>) -> Result<()> {
383    ensure_runtime_owner()?;
384    let (event, subscribers) = {
385        let scope_stack = current_scope_stack();
386        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
387        let scope_locals = scope_guard.collect_scope_local_registries(|registries| {
388            &registries.llm_sanitize_response_guardrails
389        });
390        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
391        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
392        let context = global_context();
393        let state = context
394            .read()
395            .map_err(|error| FlowError::Internal(error.to_string()))?;
396
397        let sanitized_response = state.llm_sanitize_response_chain(params.response, &scope_locals);
398        let data = if sanitized_response.is_null() {
399            params.data
400        } else {
401            Some(sanitized_response)
402        };
403        let event = state.build_llm_end_event(
404            EndLlmHandleParams::builder()
405                .handle(params.handle)
406                .data_opt(data)
407                .metadata_opt(params.metadata)
408                .annotated_response_opt(params.annotated_response)
409                .timestamp_opt(params.timestamp)
410                .build(),
411        );
412        (event, subscribers)
413    };
414    NemoFlowContextState::emit_event(&event, &subscribers);
415    Ok(())
416}
417
418fn emit_llm_end_without_output(handle: &LlmHandle, metadata: Option<Json>) -> Result<()> {
419    ensure_runtime_owner()?;
420    let (event, subscribers) = {
421        let scope_stack = current_scope_stack();
422        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
423        let scope_subscribers = scope_guard.collect_scope_local_subscribers();
424        let subscribers = snapshot_event_subscribers(scope_subscribers)?;
425        let context = global_context();
426        let state = context
427            .read()
428            .map_err(|error| FlowError::Internal(error.to_string()))?;
429        let event = state.end_llm_handle(handle, handle.data.clone(), metadata, None);
430        (event, subscribers)
431    };
432    NemoFlowContextState::emit_event(&event, &subscribers);
433    Ok(())
434}
435
436/// Execute an LLM call through the managed middleware pipeline.
437///
438/// This runs conditional-execution guardrails, request intercepts,
439/// sanitize-request guardrails, execution intercepts, the provider callback,
440/// and sanitize-response guardrails in the runtime-defined order.
441///
442/// # Parameters
443/// - `name`: Logical provider or model family name recorded on emitted events.
444/// - `request`: Raw [`LlmRequest`] passed into the managed pipeline.
445/// - `func`: Provider callback or execution continuation.
446/// - `parent`: Optional explicit parent scope for the emitted LLM span.
447/// - `attributes`: LLM attribute bitflags applied to the managed span.
448/// - `data`: Optional application payload stored on the managed LLM handle. It
449///   may be used on failure end events that have no output payload.
450/// - `metadata`: Optional JSON metadata recorded on emitted events.
451/// - `model_name`: Optional normalized model name for observability output.
452/// - `codec`: Optional request codec used to produce annotated request data for
453///   intercepts and events.
454/// - `response_codec`: Optional response codec used to attach annotated
455///   response data to the end event.
456///
457/// # Returns
458/// A [`Result`] containing the raw JSON response returned by the callback or
459/// an execution intercept.
460///
461/// # Errors
462/// Returns [`FlowError::GuardrailRejected`] when conditional-execution
463/// guardrails block the call, or any error raised by request intercepts,
464/// execution intercepts, codecs, or the callback itself.
465///
466/// # Notes
467/// Response codecs enrich observability output only and do not change the
468/// value returned to the caller.
469pub async fn llm_call_execute(params: LlmCallExecuteParams) -> Result<Json> {
470    let LlmCallExecuteParams {
471        name,
472        request,
473        func,
474        parent,
475        attributes,
476        data,
477        metadata,
478        model_name,
479        codec,
480        response_codec,
481    } = params;
482    ensure_runtime_owner()?;
483    {
484        let scope_stack = current_scope_stack();
485        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
486        let scope_locals = scope_guard.collect_scope_local_registries(|registries| {
487            &registries.llm_conditional_execution_guardrails
488        });
489        let context = global_context();
490        let state = context
491            .read()
492            .map_err(|error| FlowError::Internal(error.to_string()))?;
493        if let Some(error) = state.llm_conditional_execution_chain(&request, &scope_locals)? {
494            drop(state);
495            drop(scope_guard);
496            let mut rejection_data = json!({});
497            if let Some(object) = rejection_data.as_object_mut() {
498                object.insert("rejected".into(), json!(true));
499                object.insert("rejection_reason".into(), json!(&error));
500            }
501            let _ = event(
502                EmitMarkEventParams::builder()
503                    .name(&name)
504                    .parent_opt(parent.as_ref())
505                    .data(rejection_data)
506                    .metadata_opt(metadata.clone())
507                    .build(),
508            );
509            return Err(FlowError::GuardrailRejected(error));
510        }
511    }
512
513    let (intercepted_request, annotated_request) =
514        run_request_intercepts_with_codec(&name, request, codec)?;
515
516    let handle = create_llm_handle(
517        CreateLlmHandleParams::builder()
518            .name(name.as_str())
519            .parent_uuid_opt(resolve_parent_uuid(parent.as_ref()))
520            .attributes(attributes)
521            .data_opt(data.clone())
522            .metadata_opt(metadata.clone())
523            .model_name_opt(model_name)
524            .build(),
525    )?;
526    let start_emitted = Arc::new(AtomicBool::new(false));
527    let fallback_request = intercepted_request.clone();
528    let execution_handle = handle.clone();
529    let execution_annotated_request = annotated_request.clone();
530    let execution_start_emitted = start_emitted.clone();
531    let instrumented_func: LlmExecutionNextFn = Arc::new(move |request| {
532        let next = func.clone();
533        let handle = execution_handle.clone();
534        let annotated_request = execution_annotated_request.clone();
535        let start_emitted = execution_start_emitted.clone();
536        Box::pin(async move {
537            emit_llm_start_once(&start_emitted, &handle, &request, annotated_request)?;
538            next(request).await
539        })
540    });
541
542    let execution = {
543        let scope_stack = current_scope_stack();
544        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
545        let scope_locals = scope_guard
546            .collect_scope_local_registries(|registries| &registries.llm_execution_intercepts);
547        let context = global_context();
548        let state = context
549            .read()
550            .map_err(|error| FlowError::Internal(error.to_string()))?;
551        state.llm_build_execution_chain(&name, instrumented_func, &scope_locals)
552    };
553
554    match execution(intercepted_request).await {
555        Ok(response) => {
556            emit_llm_start_once(
557                &start_emitted,
558                &handle,
559                &fallback_request,
560                annotated_request.clone(),
561            )?;
562            let annotated_response = response_codec
563                .as_ref()
564                .and_then(|codec| codec.decode_response(&response).ok())
565                .map(Arc::new);
566            llm_call_end(
567                LlmCallEndParams::builder()
568                    .handle(&handle)
569                    .response(response.clone())
570                    .data_opt(data)
571                    .metadata_opt(metadata)
572                    .annotated_response_opt(annotated_response)
573                    .build(),
574            )?;
575            Ok(response)
576        }
577        Err(error) => {
578            let _ = emit_llm_start_once(
579                &start_emitted,
580                &handle,
581                &fallback_request,
582                annotated_request,
583            );
584            let _ = emit_llm_end_without_output(&handle, metadata);
585            Err(error)
586        }
587    }
588}
589
590/// Execute a streaming LLM call through the managed middleware pipeline.
591///
592/// This runs the same pre-execution middleware as [`llm_call_execute`] and
593/// then wraps the provider stream so chunk callbacks and finalization can emit
594/// a single LLM-end event when streaming completes.
595///
596/// # Parameters
597/// - `name`: Logical provider or model family name recorded on emitted events.
598/// - `request`: Raw [`LlmRequest`] passed into the managed pipeline.
599/// - `func`: Streaming provider callback or execution continuation.
600/// - `collector`: Per-chunk collector callback used to accumulate stream state.
601/// - `finalizer`: Finalizer callback used to construct the completed response.
602/// - `parent`: Optional explicit parent scope for the emitted LLM span.
603/// - `attributes`: LLM attribute bitflags applied to the managed span.
604/// - `data`: Optional application payload stored on the managed LLM handle. It
605///   may be used on failure end events that have no output payload.
606/// - `metadata`: Optional JSON metadata recorded on emitted events.
607/// - `model_name`: Optional normalized model name for observability output.
608/// - `codec`: Optional request codec used to produce annotated request data for
609///   intercepts and events.
610/// - `response_codec`: Optional response codec used to attach annotated
611///   response data to the end event.
612///
613/// # Returns
614/// A [`Result`] containing a boxed stream of JSON chunks.
615///
616/// # Errors
617/// Returns [`FlowError::GuardrailRejected`] when conditional-execution
618/// guardrails block the call, or any error raised by request intercepts,
619/// execution intercepts, stream callbacks, codecs, or the provider callback.
620///
621/// # Notes
622/// The returned stream emits chunk-level results while the runtime defers the
623/// LLM-end event until the collector and finalizer complete.
624pub async fn llm_stream_call_execute(params: LlmStreamCallExecuteParams) -> Result<LlmJsonStream> {
625    let LlmStreamCallExecuteParams {
626        name,
627        request,
628        func,
629        collector,
630        finalizer,
631        parent,
632        attributes,
633        data,
634        metadata,
635        model_name,
636        codec,
637        response_codec,
638    } = params;
639    ensure_runtime_owner()?;
640    {
641        let scope_stack = current_scope_stack();
642        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
643        let scope_locals = scope_guard.collect_scope_local_registries(|registries| {
644            &registries.llm_conditional_execution_guardrails
645        });
646        let context = global_context();
647        let state = context
648            .read()
649            .map_err(|error| FlowError::Internal(error.to_string()))?;
650        if let Some(error) = state.llm_conditional_execution_chain(&request, &scope_locals)? {
651            drop(state);
652            drop(scope_guard);
653            let mut rejection_data = json!({});
654            if let Some(object) = rejection_data.as_object_mut() {
655                object.insert("rejected".into(), json!(true));
656                object.insert("rejection_reason".into(), json!(&error));
657            }
658            let _ = event(
659                EmitMarkEventParams::builder()
660                    .name(&name)
661                    .parent_opt(parent.as_ref())
662                    .data(rejection_data)
663                    .metadata_opt(metadata.clone())
664                    .build(),
665            );
666            return Err(FlowError::GuardrailRejected(error));
667        }
668    }
669
670    let (intercepted_request, annotated_request) =
671        run_request_intercepts_with_codec(&name, request, codec)?;
672
673    let handle = create_llm_handle(
674        CreateLlmHandleParams::builder()
675            .name(name.as_str())
676            .parent_uuid_opt(resolve_parent_uuid(parent.as_ref()))
677            .attributes(attributes)
678            .data_opt(data.clone())
679            .metadata_opt(metadata.clone())
680            .model_name_opt(model_name)
681            .build(),
682    )?;
683    let start_emitted = Arc::new(AtomicBool::new(false));
684    let fallback_request = intercepted_request.clone();
685    let execution_handle = handle.clone();
686    let execution_annotated_request = annotated_request.clone();
687    let execution_start_emitted = start_emitted.clone();
688    let instrumented_func: LlmStreamExecutionNextFn = Arc::new(move |request| {
689        let next = func.clone();
690        let handle = execution_handle.clone();
691        let annotated_request = execution_annotated_request.clone();
692        let start_emitted = execution_start_emitted.clone();
693        Box::pin(async move {
694            emit_llm_start_once(&start_emitted, &handle, &request, annotated_request)?;
695            next(request).await
696        })
697    });
698
699    let execution = {
700        let scope_stack = current_scope_stack();
701        let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
702        let scope_locals = scope_guard.collect_scope_local_registries(|registries| {
703            &registries.llm_stream_execution_intercepts
704        });
705        let context = global_context();
706        let state = context
707            .read()
708            .map_err(|error| FlowError::Internal(error.to_string()))?;
709        state.llm_stream_build_execution_chain(&name, instrumented_func, &scope_locals)
710    };
711
712    match execution(intercepted_request).await {
713        Ok(raw_stream) => {
714            emit_llm_start_once(
715                &start_emitted,
716                &handle,
717                &fallback_request,
718                annotated_request.clone(),
719            )?;
720            let wrapper = LlmStreamWrapper::new(
721                raw_stream,
722                handle,
723                collector,
724                finalizer,
725                data,
726                metadata,
727                response_codec,
728            );
729            Ok(Box::pin(wrapper) as LlmJsonStream)
730        }
731        Err(error) => {
732            let _ = emit_llm_start_once(
733                &start_emitted,
734                &handle,
735                &fallback_request,
736                annotated_request,
737            );
738            let _ = emit_llm_end_without_output(&handle, metadata);
739            Err(error)
740        }
741    }
742}
743
744/// Run only the LLM request-intercept chain.
745///
746/// This applies the currently active global and scope-local request intercepts
747/// without emitting lifecycle events or invoking provider execution.
748///
749/// # Parameters
750/// - `name`: Logical provider or model family name used when resolving the
751///   intercept chain.
752/// - `request`: Raw [`LlmRequest`] to transform.
753///
754/// # Returns
755/// A [`Result`] containing the transformed [`LlmRequest`].
756///
757/// # Errors
758/// Returns any error raised by the request-intercept chain.
759///
760/// # Notes
761/// Conditional guardrails, codecs, and execution intercepts are not run by
762/// this helper.
763pub fn llm_request_intercepts(name: &str, request: LlmRequest) -> Result<LlmRequest> {
764    ensure_runtime_owner()?;
765    let scope_stack = current_scope_stack();
766    let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
767    let scope_locals =
768        scope_guard.collect_scope_local_registries(|registries| &registries.llm_request_intercepts);
769    let context = global_context();
770    let state = context
771        .read()
772        .map_err(|error| FlowError::Internal(error.to_string()))?;
773    let (request, _) = state.llm_request_intercepts_chain(name, request, None, &scope_locals)?;
774    Ok(request)
775}
776
777/// Run only the LLM conditional-execution guardrail chain.
778///
779/// This evaluates whether an LLM call should be allowed to proceed without
780/// emitting lifecycle events or invoking request intercepts or execution.
781///
782/// # Parameters
783/// - `request`: Raw [`LlmRequest`] to validate.
784///
785/// # Returns
786/// A [`Result`] that is `Ok(())` when all guardrails allow execution.
787///
788/// # Errors
789/// Returns [`FlowError::GuardrailRejected`] when a guardrail blocks execution,
790/// or any error raised by the guardrail chain itself.
791///
792/// # Notes
793/// This helper is useful for preflight checks when the caller needs the
794/// rejection result without starting an LLM span.
795pub fn llm_conditional_execution(request: &LlmRequest) -> Result<()> {
796    ensure_runtime_owner()?;
797    let scope_stack = current_scope_stack();
798    let scope_guard = scope_stack.read().expect("scope stack lock poisoned");
799    let scope_locals = scope_guard.collect_scope_local_registries(|registries| {
800        &registries.llm_conditional_execution_guardrails
801    });
802    let context = global_context();
803    let state = context
804        .read()
805        .map_err(|error| FlowError::Internal(error.to_string()))?;
806    if let Some(error) = state.llm_conditional_execution_chain(request, &scope_locals)? {
807        return Err(FlowError::GuardrailRejected(error));
808    }
809    Ok(())
810}