use super::{
Arc, FfiCodecHandle, FfiLLMHandle, FfiScopeHandle, FlowResult, LlmAttributes,
LlmExecutionNextFn, LlmRequest, LlmStreamExecutionNextFn, NemoFlowCodecDecodeFn,
NemoFlowCodecEncodeFn, NemoFlowCollectorCb, NemoFlowFinalizerCb, NemoFlowFreeFn,
NemoFlowLlmExecCb, NemoFlowStatus, TASK_SCOPE_STACK, c_char, c_str_to_json, c_str_to_opt_json,
c_str_to_string, clear_last_error, core_llm_api, current_scope_stack, json_to_c_string,
set_last_error, status_from_error, tokio_runtime, unix_micros_to_opt_timestamp, wrap_codec_fn,
wrap_collector_fn, wrap_finalizer_fn, wrap_llm_exec_fn, wrap_llm_stream_exec_fn,
};
use tokio_stream::StreamExt;
#[unsafe(no_mangle)]
pub unsafe extern "C" fn nemo_flow_llm_call(
name: *const c_char,
native_json: *const c_char,
parent: *const FfiScopeHandle,
attributes: u32,
data_json: *const c_char,
metadata_json: *const c_char,
model_name: *const c_char,
timestamp_unix_micros: *const i64,
out: *mut *mut FfiLLMHandle,
) -> NemoFlowStatus {
clear_last_error();
if out.is_null() {
set_last_error("null pointer argument");
return NemoFlowStatus::NullPointer;
}
let name = match c_str_to_string(name) {
Ok(s) => s,
Err(status) => return status,
};
let native = match c_str_to_json(native_json) {
Some(n) => n,
None => return NemoFlowStatus::InvalidJson,
};
let request: LlmRequest = match serde_json::from_value(native) {
Ok(r) => r,
Err(_) => {
set_last_error("failed to parse native_json as LlmRequest");
return NemoFlowStatus::InvalidJson;
}
};
let parent_ref = if parent.is_null() {
None
} else {
Some(&unsafe { &*parent }.0)
};
let attrs = LlmAttributes::from_bits_truncate(attributes);
let data = match c_str_to_opt_json(data_json) {
Some(d) => d,
None => return NemoFlowStatus::InvalidJson,
};
let metadata = match c_str_to_opt_json(metadata_json) {
Some(m) => m,
None => return NemoFlowStatus::InvalidJson,
};
let model_name_opt = if model_name.is_null() {
None
} else {
match c_str_to_string(model_name) {
Ok(s) => Some(s),
Err(status) => return status,
}
};
let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) {
Some(v) => v,
None => return NemoFlowStatus::InvalidArg,
};
match core_llm_api::llm_call(
core_llm_api::LlmCallParams::builder()
.name(&name)
.request(&request)
.parent_opt(parent_ref)
.attributes(attrs)
.data_opt(data)
.metadata_opt(metadata)
.model_name_opt(model_name_opt)
.timestamp_opt(timestamp)
.build(),
) {
Ok(h) => {
unsafe { *out = Box::into_raw(Box::new(FfiLLMHandle(h))) };
NemoFlowStatus::Ok
}
Err(e) => status_from_error(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn nemo_flow_llm_call_end(
handle: *const FfiLLMHandle,
response_json: *const c_char,
data_json: *const c_char,
metadata_json: *const c_char,
timestamp_unix_micros: *const i64,
) -> NemoFlowStatus {
clear_last_error();
if handle.is_null() {
set_last_error("handle is null");
return NemoFlowStatus::NullPointer;
}
let response = match c_str_to_json(response_json) {
Some(r) => r,
None => return NemoFlowStatus::InvalidJson,
};
let data = match c_str_to_opt_json(data_json) {
Some(d) => d,
None => return NemoFlowStatus::InvalidJson,
};
let metadata = match c_str_to_opt_json(metadata_json) {
Some(m) => m,
None => return NemoFlowStatus::InvalidJson,
};
let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) {
Some(v) => v,
None => return NemoFlowStatus::InvalidArg,
};
match core_llm_api::llm_call_end(
core_llm_api::LlmCallEndParams::builder()
.handle(&unsafe { &*handle }.0)
.response(response)
.data_opt(data)
.metadata_opt(metadata)
.timestamp_opt(timestamp)
.build(),
) {
Ok(()) => NemoFlowStatus::Ok,
Err(e) => status_from_error(&e),
}
}
#[unsafe(no_mangle)]
pub extern "C" fn nemo_flow_openai_chat_codec_new() -> *mut FfiCodecHandle {
Box::into_raw(Box::new(FfiCodecHandle {
codec: Arc::new(nemo_flow::codec::openai_chat::OpenAIChatCodec),
response_codec: Arc::new(nemo_flow::codec::openai_chat::OpenAIChatCodec),
}))
}
#[unsafe(no_mangle)]
pub extern "C" fn nemo_flow_openai_responses_codec_new() -> *mut FfiCodecHandle {
Box::into_raw(Box::new(FfiCodecHandle {
codec: Arc::new(nemo_flow::codec::openai_responses::OpenAIResponsesCodec),
response_codec: Arc::new(nemo_flow::codec::openai_responses::OpenAIResponsesCodec),
}))
}
#[unsafe(no_mangle)]
pub extern "C" fn nemo_flow_anthropic_messages_codec_new() -> *mut FfiCodecHandle {
Box::into_raw(Box::new(FfiCodecHandle {
codec: Arc::new(nemo_flow::codec::anthropic::AnthropicMessagesCodec),
response_codec: Arc::new(nemo_flow::codec::anthropic::AnthropicMessagesCodec),
}))
}
struct ParsedExecuteInputs {
name: String,
request: LlmRequest,
parent_handle: Option<nemo_flow::api::scope::ScopeHandle>,
attrs: LlmAttributes,
data: Option<serde_json::Value>,
metadata: Option<serde_json::Value>,
model_name: Option<String>,
codec: Option<Arc<dyn nemo_flow::codec::traits::LlmCodec>>,
response_codec: Option<Arc<dyn nemo_flow::codec::traits::LlmResponseCodec>>,
}
struct RawExecuteInputs {
name: *const c_char,
native_json: *const c_char,
parent: *const FfiScopeHandle,
attributes: u32,
data_json: *const c_char,
metadata_json: *const c_char,
model_name: *const c_char,
codec_decode: NemoFlowCodecDecodeFn,
codec_encode: NemoFlowCodecEncodeFn,
codec_user_data: *mut libc::c_void,
codec_free_fn: NemoFlowFreeFn,
response_codec: *const FfiCodecHandle,
}
fn parse_llm_request(native_json: *const c_char) -> Result<LlmRequest, NemoFlowStatus> {
let native = c_str_to_json(native_json).ok_or(NemoFlowStatus::InvalidJson)?;
serde_json::from_value(native).map_err(|_| {
set_last_error("failed to parse native_json as LlmRequest");
NemoFlowStatus::InvalidJson
})
}
fn parse_optional_model_name(model_name: *const c_char) -> Result<Option<String>, NemoFlowStatus> {
if model_name.is_null() {
Ok(None)
} else {
c_str_to_string(model_name).map(Some)
}
}
fn parse_execute_inputs(raw: RawExecuteInputs) -> Result<ParsedExecuteInputs, NemoFlowStatus> {
let name = c_str_to_string(raw.name)?;
let request = parse_llm_request(raw.native_json)?;
let parent_handle = if raw.parent.is_null() {
None
} else {
Some(unsafe { &*raw.parent }.0.clone())
};
let attrs = LlmAttributes::from_bits_truncate(raw.attributes);
let data = c_str_to_opt_json(raw.data_json).ok_or(NemoFlowStatus::InvalidJson)?;
let metadata = c_str_to_opt_json(raw.metadata_json).ok_or(NemoFlowStatus::InvalidJson)?;
let model_name = parse_optional_model_name(raw.model_name)?;
let codec = match (raw.codec_decode, raw.codec_encode) {
(Some(decode_cb), Some(encode_cb)) => Some(wrap_codec_fn(
decode_cb,
encode_cb,
raw.codec_user_data,
raw.codec_free_fn,
)),
(None, None) => None,
_ => {
set_last_error(
"codec_decode and codec_encode must either both be provided or both be null",
);
return Err(NemoFlowStatus::InvalidArg);
}
};
let response_codec = if raw.response_codec.is_null() {
None
} else {
Some(unsafe { &*raw.response_codec }.response_codec.clone())
};
Ok(ParsedExecuteInputs {
name,
request,
parent_handle,
attrs,
data,
metadata,
model_name,
codec,
response_codec,
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn nemo_flow_llm_call_execute(
name: *const c_char,
native_json: *const c_char,
func: NemoFlowLlmExecCb,
func_user_data: *mut libc::c_void,
func_free: NemoFlowFreeFn,
parent: *const FfiScopeHandle,
attributes: u32,
data_json: *const c_char,
metadata_json: *const c_char,
model_name: *const c_char,
codec_decode: NemoFlowCodecDecodeFn,
codec_encode: NemoFlowCodecEncodeFn,
codec_user_data: *mut libc::c_void,
codec_free_fn: NemoFlowFreeFn,
response_codec: *const FfiCodecHandle,
out: *mut *mut c_char,
) -> NemoFlowStatus {
clear_last_error();
if out.is_null() {
set_last_error("null pointer argument");
return NemoFlowStatus::NullPointer;
}
let parsed = match parse_execute_inputs(RawExecuteInputs {
name,
native_json,
parent,
attributes,
data_json,
metadata_json,
model_name,
codec_decode,
codec_encode,
codec_user_data,
codec_free_fn,
response_codec,
}) {
Ok(parsed) => parsed,
Err(status) => return status,
};
let exec_fn = wrap_llm_exec_fn(func, func_user_data, func_free);
let default_fn: LlmExecutionNextFn = Arc::new(move |request| exec_fn(request));
let scope_stack = current_scope_stack();
let result = tokio_runtime().block_on(TASK_SCOPE_STACK.scope(scope_stack, async {
core_llm_api::llm_call_execute(
core_llm_api::LlmCallExecuteParams::builder()
.name(parsed.name)
.request(parsed.request)
.func(default_fn)
.parent_opt(parsed.parent_handle)
.attributes(parsed.attrs)
.data_opt(parsed.data)
.metadata_opt(parsed.metadata)
.model_name_opt(parsed.model_name)
.codec_opt(parsed.codec)
.response_codec_opt(parsed.response_codec)
.build(),
)
.await
}));
match result {
Ok(json) => {
unsafe { *out = json_to_c_string(&json) };
NemoFlowStatus::Ok
}
Err(e) => status_from_error(&e),
}
}
pub struct FfiStream {
pub(crate) receiver:
tokio::sync::Mutex<tokio::sync::mpsc::Receiver<FlowResult<serde_json::Value>>>,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn nemo_flow_llm_stream_call_execute(
name: *const c_char,
native_json: *const c_char,
func: NemoFlowLlmExecCb,
func_user_data: *mut libc::c_void,
func_free: NemoFlowFreeFn,
collector: Option<NemoFlowCollectorCb>,
finalizer: Option<NemoFlowFinalizerCb>,
parent: *const FfiScopeHandle,
attributes: u32,
data_json: *const c_char,
metadata_json: *const c_char,
model_name: *const c_char,
codec_decode: NemoFlowCodecDecodeFn,
codec_encode: NemoFlowCodecEncodeFn,
codec_user_data: *mut libc::c_void,
codec_free_fn: NemoFlowFreeFn,
response_codec: *const FfiCodecHandle,
out: *mut *mut FfiStream,
) -> NemoFlowStatus {
clear_last_error();
if out.is_null() {
set_last_error("null pointer argument");
return NemoFlowStatus::NullPointer;
}
let parsed = match parse_execute_inputs(RawExecuteInputs {
name,
native_json,
parent,
attributes,
data_json,
metadata_json,
model_name,
codec_decode,
codec_encode,
codec_user_data,
codec_free_fn,
response_codec,
}) {
Ok(parsed) => parsed,
Err(status) => return status,
};
let exec_fn = wrap_llm_stream_exec_fn(func, func_user_data, func_free);
let default_fn: LlmStreamExecutionNextFn = Arc::new(move |request| exec_fn(request));
let wrapped_collector: Box<dyn FnMut(serde_json::Value) -> FlowResult<()> + Send> =
match collector {
Some(cb) => wrap_collector_fn(cb),
None => Box::new(|_: serde_json::Value| Ok(())),
};
let wrapped_finalizer: Box<dyn FnOnce() -> serde_json::Value + Send> = match finalizer {
Some(cb) => wrap_finalizer_fn(cb),
None => Box::new(|| serde_json::Value::Null),
};
let scope_stack = current_scope_stack();
let result = tokio_runtime().block_on(TASK_SCOPE_STACK.scope(scope_stack, async {
core_llm_api::llm_stream_call_execute(
core_llm_api::LlmStreamCallExecuteParams::builder()
.name(parsed.name)
.request(parsed.request)
.func(default_fn)
.collector(wrapped_collector)
.finalizer(wrapped_finalizer)
.parent_opt(parsed.parent_handle)
.attributes(parsed.attrs)
.data_opt(parsed.data)
.metadata_opt(parsed.metadata)
.model_name_opt(parsed.model_name)
.codec_opt(parsed.codec)
.response_codec_opt(parsed.response_codec)
.build(),
)
.await
}));
match result {
Ok(rust_stream) => {
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio_runtime().spawn(async move {
let mut stream = rust_stream;
while let Some(item) = stream.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
let ffi_stream = Box::new(FfiStream {
receiver: tokio::sync::Mutex::new(rx),
});
unsafe { *out = Box::into_raw(ffi_stream) };
NemoFlowStatus::Ok
}
Err(e) => status_from_error(&e),
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn nemo_flow_stream_next(
stream: *mut FfiStream,
out_chunk: *mut *mut c_char,
) -> i32 {
clear_last_error();
if stream.is_null() || out_chunk.is_null() {
return -1;
}
let stream = unsafe { &*stream };
let result = tokio_runtime().block_on(async {
let mut guard = stream.receiver.lock().await;
guard.recv().await
});
match result {
None => 0, Some(Ok(chunk)) => {
unsafe { *out_chunk = json_to_c_string(&chunk) };
1
}
Some(Err(e)) => {
set_last_error(&e.to_string());
-1
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn nemo_flow_stream_free(stream: *mut FfiStream) {
if !stream.is_null() {
drop(unsafe { Box::from_raw(stream) });
}
}