Skip to main content

nemo_flow_ffi/api/
llm.rs

1// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Arc, FfiCodecHandle, FfiLLMHandle, FfiScopeHandle, FlowResult, LlmAttributes,
6    LlmExecutionNextFn, LlmRequest, LlmStreamExecutionNextFn, NemoFlowCodecDecodeFn,
7    NemoFlowCodecEncodeFn, NemoFlowCollectorCb, NemoFlowFinalizerCb, NemoFlowFreeFn,
8    NemoFlowLlmExecCb, NemoFlowStatus, TASK_SCOPE_STACK, c_char, c_str_to_json, c_str_to_opt_json,
9    c_str_to_string, clear_last_error, core_llm_api, current_scope_stack, json_to_c_string,
10    set_last_error, status_from_error, tokio_runtime, unix_micros_to_opt_timestamp, wrap_codec_fn,
11    wrap_collector_fn, wrap_finalizer_fn, wrap_llm_exec_fn, wrap_llm_stream_exec_fn,
12};
13use tokio_stream::StreamExt;
14
15// ---------------------------------------------------------------------------
16// LLM lifecycle
17// ---------------------------------------------------------------------------
18
19/// Begin a manual LLM call lifecycle span.
20///
21/// This emits an LLM Start event after applying sanitize-request guardrails to
22/// the observability payload. Request and execution intercepts only run through
23/// `nemo_flow_llm_call_execute`.
24///
25/// # Parameters
26/// - `name`: Null-terminated LLM provider name.
27/// - `native_json`: The request payload as a JSON C string representing an
28///   `LlmRequest` (`{"headers": {...}, "content": {...}}`). The request
29///   becomes the start-event data after sanitize-request guardrails.
30/// - `parent`: Optional parent scope handle, or null to use the current top of
31///   stack.
32/// - `attributes`: Bitfield of LLM attributes.
33/// - `data_json`: Optional null-terminated JSON string stored on the LLM
34///   handle, or null.
35/// - `metadata_json`: Optional null-terminated JSON metadata string recorded
36///   on the start event, or null.
37/// - `model_name`: Optional null-terminated LLM model identifier recorded in
38///   the LLM event category profile, or null.
39/// - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the
40///   handle start time and start event, or null to use the current UTC time.
41/// - `out`: On success, receives a heap-allocated `FfiLLMHandle` that must be
42///   freed with `nemo_flow_llm_handle_free`.
43///
44/// # Errors
45/// Returns `InvalidJson` for invalid JSON inputs and `InvalidArg` when
46/// `timestamp_unix_micros` is outside the supported timestamp range.
47///
48/// # Safety
49/// `name`, `native_json`, and `out` must be valid, non-null pointers. Optional
50/// pointer arguments may be null; when non-null, they must be valid for reads
51/// for the duration of the call.
52#[unsafe(no_mangle)]
53pub unsafe extern "C" fn nemo_flow_llm_call(
54    name: *const c_char,
55    native_json: *const c_char,
56    parent: *const FfiScopeHandle,
57    attributes: u32,
58    data_json: *const c_char,
59    metadata_json: *const c_char,
60    model_name: *const c_char,
61    timestamp_unix_micros: *const i64,
62    out: *mut *mut FfiLLMHandle,
63) -> NemoFlowStatus {
64    clear_last_error();
65    if out.is_null() {
66        set_last_error("null pointer argument");
67        return NemoFlowStatus::NullPointer;
68    }
69    let name = match c_str_to_string(name) {
70        Ok(s) => s,
71        Err(status) => return status,
72    };
73    let native = match c_str_to_json(native_json) {
74        Some(n) => n,
75        None => return NemoFlowStatus::InvalidJson,
76    };
77    let request: LlmRequest = match serde_json::from_value(native) {
78        Ok(r) => r,
79        Err(_) => {
80            set_last_error("failed to parse native_json as LlmRequest");
81            return NemoFlowStatus::InvalidJson;
82        }
83    };
84    let parent_ref = if parent.is_null() {
85        None
86    } else {
87        Some(&unsafe { &*parent }.0)
88    };
89    let attrs = LlmAttributes::from_bits_truncate(attributes);
90    let data = match c_str_to_opt_json(data_json) {
91        Some(d) => d,
92        None => return NemoFlowStatus::InvalidJson,
93    };
94    let metadata = match c_str_to_opt_json(metadata_json) {
95        Some(m) => m,
96        None => return NemoFlowStatus::InvalidJson,
97    };
98    let model_name_opt = if model_name.is_null() {
99        None
100    } else {
101        match c_str_to_string(model_name) {
102            Ok(s) => Some(s),
103            Err(status) => return status,
104        }
105    };
106    let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) {
107        Some(v) => v,
108        None => return NemoFlowStatus::InvalidArg,
109    };
110
111    match core_llm_api::llm_call(
112        core_llm_api::LlmCallParams::builder()
113            .name(&name)
114            .request(&request)
115            .parent_opt(parent_ref)
116            .attributes(attrs)
117            .data_opt(data)
118            .metadata_opt(metadata)
119            .model_name_opt(model_name_opt)
120            .timestamp_opt(timestamp)
121            .build(),
122    ) {
123        Ok(h) => {
124            unsafe { *out = Box::into_raw(Box::new(FfiLLMHandle(h))) };
125            NemoFlowStatus::Ok
126        }
127        Err(e) => status_from_error(&e),
128    }
129}
130
131/// End a manual LLM call lifecycle span.
132///
133/// This emits an LLM End event after applying sanitize-response guardrails to
134/// the observability payload. Response intercepts only run through
135/// `nemo_flow_llm_call_execute`.
136///
137/// # Parameters
138/// - `handle`: The LLM handle from `nemo_flow_llm_call`.
139/// - `response_json`: LLM response as a null-terminated JSON C string. This
140///   response becomes the end-event data after sanitize-response guardrails
141///   unless it sanitizes to JSON null.
142/// - `data_json`: Optional null-terminated JSON data used when the sanitized
143///   response is JSON null, or null.
144/// - `metadata_json`: Optional null-terminated JSON metadata recorded on the
145///   end event, or null.
146/// - `timestamp_unix_micros`: Optional Unix microseconds timestamp for the end
147///   event, or null to use the runtime default end timestamp.
148///
149/// # Errors
150/// Returns `InvalidJson` for invalid JSON inputs and `InvalidArg` when
151/// `timestamp_unix_micros` is outside the supported timestamp range.
152///
153/// # Safety
154/// `handle` and `response_json` must be valid, non-null pointers. Optional
155/// pointer arguments may be null; when non-null, they must be valid for reads
156/// for the duration of the call.
157#[unsafe(no_mangle)]
158pub unsafe extern "C" fn nemo_flow_llm_call_end(
159    handle: *const FfiLLMHandle,
160    response_json: *const c_char,
161    data_json: *const c_char,
162    metadata_json: *const c_char,
163    timestamp_unix_micros: *const i64,
164) -> NemoFlowStatus {
165    clear_last_error();
166    if handle.is_null() {
167        set_last_error("handle is null");
168        return NemoFlowStatus::NullPointer;
169    }
170    let response = match c_str_to_json(response_json) {
171        Some(r) => r,
172        None => return NemoFlowStatus::InvalidJson,
173    };
174    let data = match c_str_to_opt_json(data_json) {
175        Some(d) => d,
176        None => return NemoFlowStatus::InvalidJson,
177    };
178    let metadata = match c_str_to_opt_json(metadata_json) {
179        Some(m) => m,
180        None => return NemoFlowStatus::InvalidJson,
181    };
182    let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) {
183        Some(v) => v,
184        None => return NemoFlowStatus::InvalidArg,
185    };
186
187    match core_llm_api::llm_call_end(
188        core_llm_api::LlmCallEndParams::builder()
189            .handle(&unsafe { &*handle }.0)
190            .response(response)
191            .data_opt(data)
192            .metadata_opt(metadata)
193            .timestamp_opt(timestamp)
194            .build(),
195    ) {
196        Ok(()) => NemoFlowStatus::Ok,
197        Err(e) => status_from_error(&e),
198    }
199}
200
201// ---------------------------------------------------------------------------
202// Built-in codec constructors
203// ---------------------------------------------------------------------------
204
205/// Create a new OpenAI Chat Completions codec handle.
206///
207/// The returned handle implements both request codec (decode/encode) and
208/// response codec (decode_response). Free with `nemo_flow_codec_free`.
209///
210/// # Safety
211/// Caller must free the returned handle via `nemo_flow_codec_free`.
212#[unsafe(no_mangle)]
213pub extern "C" fn nemo_flow_openai_chat_codec_new() -> *mut FfiCodecHandle {
214    Box::into_raw(Box::new(FfiCodecHandle {
215        codec: Arc::new(nemo_flow::codec::openai_chat::OpenAIChatCodec),
216        response_codec: Arc::new(nemo_flow::codec::openai_chat::OpenAIChatCodec),
217    }))
218}
219
220/// Create a new OpenAI Responses API codec handle.
221///
222/// The returned handle implements both request codec (decode/encode) and
223/// response codec (decode_response). Free with `nemo_flow_codec_free`.
224///
225/// # Safety
226/// Caller must free the returned handle via `nemo_flow_codec_free`.
227#[unsafe(no_mangle)]
228pub extern "C" fn nemo_flow_openai_responses_codec_new() -> *mut FfiCodecHandle {
229    Box::into_raw(Box::new(FfiCodecHandle {
230        codec: Arc::new(nemo_flow::codec::openai_responses::OpenAIResponsesCodec),
231        response_codec: Arc::new(nemo_flow::codec::openai_responses::OpenAIResponsesCodec),
232    }))
233}
234
235/// Create a new Anthropic Messages API codec handle.
236///
237/// The returned handle implements both request codec (decode/encode) and
238/// response codec (decode_response). Free with `nemo_flow_codec_free`.
239///
240/// # Safety
241/// Caller must free the returned handle via `nemo_flow_codec_free`.
242#[unsafe(no_mangle)]
243pub extern "C" fn nemo_flow_anthropic_messages_codec_new() -> *mut FfiCodecHandle {
244    Box::into_raw(Box::new(FfiCodecHandle {
245        codec: Arc::new(nemo_flow::codec::anthropic::AnthropicMessagesCodec),
246        response_codec: Arc::new(nemo_flow::codec::anthropic::AnthropicMessagesCodec),
247    }))
248}
249
250struct ParsedExecuteInputs {
251    name: String,
252    request: LlmRequest,
253    parent_handle: Option<nemo_flow::api::scope::ScopeHandle>,
254    attrs: LlmAttributes,
255    data: Option<serde_json::Value>,
256    metadata: Option<serde_json::Value>,
257    model_name: Option<String>,
258    codec: Option<Arc<dyn nemo_flow::codec::traits::LlmCodec>>,
259    response_codec: Option<Arc<dyn nemo_flow::codec::traits::LlmResponseCodec>>,
260}
261
262struct RawExecuteInputs {
263    name: *const c_char,
264    native_json: *const c_char,
265    parent: *const FfiScopeHandle,
266    attributes: u32,
267    data_json: *const c_char,
268    metadata_json: *const c_char,
269    model_name: *const c_char,
270    codec_decode: NemoFlowCodecDecodeFn,
271    codec_encode: NemoFlowCodecEncodeFn,
272    codec_user_data: *mut libc::c_void,
273    codec_free_fn: NemoFlowFreeFn,
274    response_codec: *const FfiCodecHandle,
275}
276
277fn parse_llm_request(native_json: *const c_char) -> Result<LlmRequest, NemoFlowStatus> {
278    let native = c_str_to_json(native_json).ok_or(NemoFlowStatus::InvalidJson)?;
279    serde_json::from_value(native).map_err(|_| {
280        set_last_error("failed to parse native_json as LlmRequest");
281        NemoFlowStatus::InvalidJson
282    })
283}
284
285fn parse_optional_model_name(model_name: *const c_char) -> Result<Option<String>, NemoFlowStatus> {
286    if model_name.is_null() {
287        Ok(None)
288    } else {
289        c_str_to_string(model_name).map(Some)
290    }
291}
292
293fn parse_execute_inputs(raw: RawExecuteInputs) -> Result<ParsedExecuteInputs, NemoFlowStatus> {
294    let name = c_str_to_string(raw.name)?;
295    let request = parse_llm_request(raw.native_json)?;
296    let parent_handle = if raw.parent.is_null() {
297        None
298    } else {
299        Some(unsafe { &*raw.parent }.0.clone())
300    };
301    let attrs = LlmAttributes::from_bits_truncate(raw.attributes);
302    let data = c_str_to_opt_json(raw.data_json).ok_or(NemoFlowStatus::InvalidJson)?;
303    let metadata = c_str_to_opt_json(raw.metadata_json).ok_or(NemoFlowStatus::InvalidJson)?;
304    let model_name = parse_optional_model_name(raw.model_name)?;
305    let codec = match (raw.codec_decode, raw.codec_encode) {
306        (Some(decode_cb), Some(encode_cb)) => Some(wrap_codec_fn(
307            decode_cb,
308            encode_cb,
309            raw.codec_user_data,
310            raw.codec_free_fn,
311        )),
312        (None, None) => None,
313        _ => {
314            set_last_error(
315                "codec_decode and codec_encode must either both be provided or both be null",
316            );
317            return Err(NemoFlowStatus::InvalidArg);
318        }
319    };
320    let response_codec = if raw.response_codec.is_null() {
321        None
322    } else {
323        Some(unsafe { &*raw.response_codec }.response_codec.clone())
324    };
325
326    Ok(ParsedExecuteInputs {
327        name,
328        request,
329        parent_handle,
330        attrs,
331        data,
332        metadata,
333        model_name,
334        codec,
335        response_codec,
336    })
337}
338
339/// Execute an LLM call end-to-end: run conditional-execution guardrails (on raw
340/// request), then request intercepts, sanitize-request guardrails, execution
341/// intercepts, the callback, and sanitize-response
342/// guardrails. On rejection, only a standalone Mark event is emitted (no
343/// Start/End pair) and `GuardrailRejected` is returned. Blocks the calling
344/// thread until completion.
345///
346/// # Parameters
347/// - `name`: Null-terminated LLM provider name.
348/// - `native_json`: The request payload as a JSON C string representing an
349///   `LlmRequest` (`{"headers": {...}, "content": {...}}`).
350/// - `func`: C callback that performs the actual LLM call.
351/// - `func_user_data`: Opaque pointer passed to `func`.
352/// - `func_free`: Optional destructor for `func_user_data`.
353/// - `parent`: Optional parent scope handle, or null.
354/// - `attributes`: Bitfield of LLM attributes.
355/// - `data_json`: Optional JSON data, or null.
356/// - `metadata_json`: Optional JSON metadata, or null.
357/// - `model_name`: Optional LLM model identifier, or null.
358/// - `out`: On success, receives the response as a JSON C string. Caller must
359///   free with `nemo_flow_string_free`.
360///
361/// # Safety
362/// `name`, `native_json`, and `out` must be valid, non-null pointers.
363#[unsafe(no_mangle)]
364pub unsafe extern "C" fn nemo_flow_llm_call_execute(
365    name: *const c_char,
366    native_json: *const c_char,
367    func: NemoFlowLlmExecCb,
368    func_user_data: *mut libc::c_void,
369    func_free: NemoFlowFreeFn,
370    parent: *const FfiScopeHandle,
371    attributes: u32,
372    data_json: *const c_char,
373    metadata_json: *const c_char,
374    model_name: *const c_char,
375    codec_decode: NemoFlowCodecDecodeFn,
376    codec_encode: NemoFlowCodecEncodeFn,
377    codec_user_data: *mut libc::c_void,
378    codec_free_fn: NemoFlowFreeFn,
379    response_codec: *const FfiCodecHandle,
380    out: *mut *mut c_char,
381) -> NemoFlowStatus {
382    clear_last_error();
383    if out.is_null() {
384        set_last_error("null pointer argument");
385        return NemoFlowStatus::NullPointer;
386    }
387    let parsed = match parse_execute_inputs(RawExecuteInputs {
388        name,
389        native_json,
390        parent,
391        attributes,
392        data_json,
393        metadata_json,
394        model_name,
395        codec_decode,
396        codec_encode,
397        codec_user_data,
398        codec_free_fn,
399        response_codec,
400    }) {
401        Ok(parsed) => parsed,
402        Err(status) => return status,
403    };
404
405    let exec_fn = wrap_llm_exec_fn(func, func_user_data, func_free);
406    let default_fn: LlmExecutionNextFn = Arc::new(move |request| exec_fn(request));
407
408    let scope_stack = current_scope_stack();
409    let result = tokio_runtime().block_on(TASK_SCOPE_STACK.scope(scope_stack, async {
410        core_llm_api::llm_call_execute(
411            core_llm_api::LlmCallExecuteParams::builder()
412                .name(parsed.name)
413                .request(parsed.request)
414                .func(default_fn)
415                .parent_opt(parsed.parent_handle)
416                .attributes(parsed.attrs)
417                .data_opt(parsed.data)
418                .metadata_opt(parsed.metadata)
419                .model_name_opt(parsed.model_name)
420                .codec_opt(parsed.codec)
421                .response_codec_opt(parsed.response_codec)
422                .build(),
423        )
424        .await
425    }));
426
427    match result {
428        Ok(json) => {
429            unsafe { *out = json_to_c_string(&json) };
430            NemoFlowStatus::Ok
431        }
432        Err(e) => status_from_error(&e),
433    }
434}
435
436// ---------------------------------------------------------------------------
437// Stream
438// ---------------------------------------------------------------------------
439
440/// Opaque stream handle for consuming LLM streaming responses chunk by chunk.
441/// Use `nemo_flow_stream_next` to poll and `nemo_flow_stream_free` to release.
442pub struct FfiStream {
443    pub(crate) receiver:
444        tokio::sync::Mutex<tokio::sync::mpsc::Receiver<FlowResult<serde_json::Value>>>,
445}
446
447/// Execute a streaming LLM call end-to-end. Conditional-execution guardrails
448/// run first on the raw request. Returns a stream handle that can be polled
449/// with `nemo_flow_stream_next`. Blocks until the stream is set up.
450///
451/// # Parameters
452/// - `name`: Null-terminated LLM provider name.
453/// - `native_json`: The request payload as a JSON C string representing an
454///   `LlmRequest` (`{"headers": {...}, "content": {...}}`).
455/// - `func`: C callback that performs the actual LLM call.
456/// - `func_user_data`: Opaque pointer passed to `func`.
457/// - `func_free`: Optional destructor for `func_user_data`.
458/// - `collector`: Callback invoked with each intercepted chunk as a JSON string.
459///   May be null, in which case chunks are not collected.
460/// - `finalizer`: Callback invoked once when the stream is exhausted to produce
461///   the aggregated response as a JSON C string. May be null, in which case the
462///   finalizer returns `Json::Null`.
463/// - `parent`: Optional parent scope handle, or null.
464/// - `attributes`: Bitfield of LLM attributes.
465/// - `data_json`: Optional JSON data, or null.
466/// - `metadata_json`: Optional JSON metadata, or null.
467/// - `model_name`: Optional LLM model identifier, or null.
468/// - `out`: On success, receives a heap-allocated `FfiStream`.
469///
470/// # Safety
471/// `name`, `native_json`, and `out` must be valid, non-null pointers. `collector`
472/// and `finalizer` may be null.
473#[unsafe(no_mangle)]
474pub unsafe extern "C" fn nemo_flow_llm_stream_call_execute(
475    name: *const c_char,
476    native_json: *const c_char,
477    func: NemoFlowLlmExecCb,
478    func_user_data: *mut libc::c_void,
479    func_free: NemoFlowFreeFn,
480    collector: Option<NemoFlowCollectorCb>,
481    finalizer: Option<NemoFlowFinalizerCb>,
482    parent: *const FfiScopeHandle,
483    attributes: u32,
484    data_json: *const c_char,
485    metadata_json: *const c_char,
486    model_name: *const c_char,
487    codec_decode: NemoFlowCodecDecodeFn,
488    codec_encode: NemoFlowCodecEncodeFn,
489    codec_user_data: *mut libc::c_void,
490    codec_free_fn: NemoFlowFreeFn,
491    response_codec: *const FfiCodecHandle,
492    out: *mut *mut FfiStream,
493) -> NemoFlowStatus {
494    clear_last_error();
495    if out.is_null() {
496        set_last_error("null pointer argument");
497        return NemoFlowStatus::NullPointer;
498    }
499    let parsed = match parse_execute_inputs(RawExecuteInputs {
500        name,
501        native_json,
502        parent,
503        attributes,
504        data_json,
505        metadata_json,
506        model_name,
507        codec_decode,
508        codec_encode,
509        codec_user_data,
510        codec_free_fn,
511        response_codec,
512    }) {
513        Ok(parsed) => parsed,
514        Err(status) => return status,
515    };
516
517    let exec_fn = wrap_llm_stream_exec_fn(func, func_user_data, func_free);
518    let default_fn: LlmStreamExecutionNextFn = Arc::new(move |request| exec_fn(request));
519
520    let wrapped_collector: Box<dyn FnMut(serde_json::Value) -> FlowResult<()> + Send> =
521        match collector {
522            Some(cb) => wrap_collector_fn(cb),
523            None => Box::new(|_: serde_json::Value| Ok(())),
524        };
525
526    let wrapped_finalizer: Box<dyn FnOnce() -> serde_json::Value + Send> = match finalizer {
527        Some(cb) => wrap_finalizer_fn(cb),
528        None => Box::new(|| serde_json::Value::Null),
529    };
530
531    let scope_stack = current_scope_stack();
532    let result = tokio_runtime().block_on(TASK_SCOPE_STACK.scope(scope_stack, async {
533        core_llm_api::llm_stream_call_execute(
534            core_llm_api::LlmStreamCallExecuteParams::builder()
535                .name(parsed.name)
536                .request(parsed.request)
537                .func(default_fn)
538                .collector(wrapped_collector)
539                .finalizer(wrapped_finalizer)
540                .parent_opt(parsed.parent_handle)
541                .attributes(parsed.attrs)
542                .data_opt(parsed.data)
543                .metadata_opt(parsed.metadata)
544                .model_name_opt(parsed.model_name)
545                .codec_opt(parsed.codec)
546                .response_codec_opt(parsed.response_codec)
547                .build(),
548        )
549        .await
550    }));
551
552    match result {
553        Ok(rust_stream) => {
554            let (tx, rx) = tokio::sync::mpsc::channel(32);
555            tokio_runtime().spawn(async move {
556                let mut stream = rust_stream;
557                while let Some(item) = stream.next().await {
558                    if tx.send(item).await.is_err() {
559                        break;
560                    }
561                }
562            });
563            let ffi_stream = Box::new(FfiStream {
564                receiver: tokio::sync::Mutex::new(rx),
565            });
566            unsafe { *out = Box::into_raw(ffi_stream) };
567            NemoFlowStatus::Ok
568        }
569        Err(e) => status_from_error(&e),
570    }
571}
572
573/// Poll the next chunk from a streaming LLM response. Blocks until a chunk is
574/// available.
575///
576/// # Returns
577/// - `1`: A chunk was written to `*out_chunk`. Caller must free with
578///   `nemo_flow_string_free`.
579/// - `0`: The stream is complete (no more chunks).
580/// - `-1`: An error occurred. Call `nemo_flow_last_error` for details.
581///
582/// # Safety
583/// `stream` and `out_chunk` must be valid, non-null pointers.
584#[unsafe(no_mangle)]
585pub unsafe extern "C" fn nemo_flow_stream_next(
586    stream: *mut FfiStream,
587    out_chunk: *mut *mut c_char,
588) -> i32 {
589    clear_last_error();
590    if stream.is_null() || out_chunk.is_null() {
591        return -1;
592    }
593    let stream = unsafe { &*stream };
594    let result = tokio_runtime().block_on(async {
595        let mut guard = stream.receiver.lock().await;
596        guard.recv().await
597    });
598    match result {
599        None => 0, // stream done
600        Some(Ok(chunk)) => {
601            unsafe { *out_chunk = json_to_c_string(&chunk) };
602            1
603        }
604        Some(Err(e)) => {
605            set_last_error(&e.to_string());
606            -1
607        }
608    }
609}
610
611/// Free a stream handle and release its resources.
612///
613/// # Safety
614/// `stream` must be a valid `FfiStream` pointer returned by
615/// `nemo_flow_llm_stream_call_execute`, or null.
616#[unsafe(no_mangle)]
617pub unsafe extern "C" fn nemo_flow_stream_free(stream: *mut FfiStream) {
618    if !stream.is_null() {
619        drop(unsafe { Box::from_raw(stream) });
620    }
621}