1use super::{
5 Arc, FfiCodecHandle, FfiLLMHandle, FfiScopeHandle, FlowResult, LlmAttributes,
6 LlmExecutionNextFn, LlmRequest, LlmStreamExecutionNextFn, NemoFlowCodecDecodeFn,
7 NemoFlowCodecEncodeFn, NemoFlowCollectorCb, NemoFlowFinalizerCb, NemoFlowFreeFn,
8 NemoFlowLlmExecCb, NemoFlowStatus, TASK_SCOPE_STACK, c_char, c_str_to_json, c_str_to_opt_json,
9 c_str_to_string, clear_last_error, core_llm_api, current_scope_stack, json_to_c_string,
10 set_last_error, status_from_error, tokio_runtime, unix_micros_to_opt_timestamp, wrap_codec_fn,
11 wrap_collector_fn, wrap_finalizer_fn, wrap_llm_exec_fn, wrap_llm_stream_exec_fn,
12};
13use tokio_stream::StreamExt;
14
15#[unsafe(no_mangle)]
53pub unsafe extern "C" fn nemo_flow_llm_call(
54 name: *const c_char,
55 native_json: *const c_char,
56 parent: *const FfiScopeHandle,
57 attributes: u32,
58 data_json: *const c_char,
59 metadata_json: *const c_char,
60 model_name: *const c_char,
61 timestamp_unix_micros: *const i64,
62 out: *mut *mut FfiLLMHandle,
63) -> NemoFlowStatus {
64 clear_last_error();
65 if out.is_null() {
66 set_last_error("null pointer argument");
67 return NemoFlowStatus::NullPointer;
68 }
69 let name = match c_str_to_string(name) {
70 Ok(s) => s,
71 Err(status) => return status,
72 };
73 let native = match c_str_to_json(native_json) {
74 Some(n) => n,
75 None => return NemoFlowStatus::InvalidJson,
76 };
77 let request: LlmRequest = match serde_json::from_value(native) {
78 Ok(r) => r,
79 Err(_) => {
80 set_last_error("failed to parse native_json as LlmRequest");
81 return NemoFlowStatus::InvalidJson;
82 }
83 };
84 let parent_ref = if parent.is_null() {
85 None
86 } else {
87 Some(&unsafe { &*parent }.0)
88 };
89 let attrs = LlmAttributes::from_bits_truncate(attributes);
90 let data = match c_str_to_opt_json(data_json) {
91 Some(d) => d,
92 None => return NemoFlowStatus::InvalidJson,
93 };
94 let metadata = match c_str_to_opt_json(metadata_json) {
95 Some(m) => m,
96 None => return NemoFlowStatus::InvalidJson,
97 };
98 let model_name_opt = if model_name.is_null() {
99 None
100 } else {
101 match c_str_to_string(model_name) {
102 Ok(s) => Some(s),
103 Err(status) => return status,
104 }
105 };
106 let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) {
107 Some(v) => v,
108 None => return NemoFlowStatus::InvalidArg,
109 };
110
111 match core_llm_api::llm_call(
112 core_llm_api::LlmCallParams::builder()
113 .name(&name)
114 .request(&request)
115 .parent_opt(parent_ref)
116 .attributes(attrs)
117 .data_opt(data)
118 .metadata_opt(metadata)
119 .model_name_opt(model_name_opt)
120 .timestamp_opt(timestamp)
121 .build(),
122 ) {
123 Ok(h) => {
124 unsafe { *out = Box::into_raw(Box::new(FfiLLMHandle(h))) };
125 NemoFlowStatus::Ok
126 }
127 Err(e) => status_from_error(&e),
128 }
129}
130
131#[unsafe(no_mangle)]
158pub unsafe extern "C" fn nemo_flow_llm_call_end(
159 handle: *const FfiLLMHandle,
160 response_json: *const c_char,
161 data_json: *const c_char,
162 metadata_json: *const c_char,
163 timestamp_unix_micros: *const i64,
164) -> NemoFlowStatus {
165 clear_last_error();
166 if handle.is_null() {
167 set_last_error("handle is null");
168 return NemoFlowStatus::NullPointer;
169 }
170 let response = match c_str_to_json(response_json) {
171 Some(r) => r,
172 None => return NemoFlowStatus::InvalidJson,
173 };
174 let data = match c_str_to_opt_json(data_json) {
175 Some(d) => d,
176 None => return NemoFlowStatus::InvalidJson,
177 };
178 let metadata = match c_str_to_opt_json(metadata_json) {
179 Some(m) => m,
180 None => return NemoFlowStatus::InvalidJson,
181 };
182 let timestamp = match unix_micros_to_opt_timestamp(timestamp_unix_micros) {
183 Some(v) => v,
184 None => return NemoFlowStatus::InvalidArg,
185 };
186
187 match core_llm_api::llm_call_end(
188 core_llm_api::LlmCallEndParams::builder()
189 .handle(&unsafe { &*handle }.0)
190 .response(response)
191 .data_opt(data)
192 .metadata_opt(metadata)
193 .timestamp_opt(timestamp)
194 .build(),
195 ) {
196 Ok(()) => NemoFlowStatus::Ok,
197 Err(e) => status_from_error(&e),
198 }
199}
200
201#[unsafe(no_mangle)]
213pub extern "C" fn nemo_flow_openai_chat_codec_new() -> *mut FfiCodecHandle {
214 Box::into_raw(Box::new(FfiCodecHandle {
215 codec: Arc::new(nemo_flow::codec::openai_chat::OpenAIChatCodec),
216 response_codec: Arc::new(nemo_flow::codec::openai_chat::OpenAIChatCodec),
217 }))
218}
219
220#[unsafe(no_mangle)]
228pub extern "C" fn nemo_flow_openai_responses_codec_new() -> *mut FfiCodecHandle {
229 Box::into_raw(Box::new(FfiCodecHandle {
230 codec: Arc::new(nemo_flow::codec::openai_responses::OpenAIResponsesCodec),
231 response_codec: Arc::new(nemo_flow::codec::openai_responses::OpenAIResponsesCodec),
232 }))
233}
234
235#[unsafe(no_mangle)]
243pub extern "C" fn nemo_flow_anthropic_messages_codec_new() -> *mut FfiCodecHandle {
244 Box::into_raw(Box::new(FfiCodecHandle {
245 codec: Arc::new(nemo_flow::codec::anthropic::AnthropicMessagesCodec),
246 response_codec: Arc::new(nemo_flow::codec::anthropic::AnthropicMessagesCodec),
247 }))
248}
249
250struct ParsedExecuteInputs {
251 name: String,
252 request: LlmRequest,
253 parent_handle: Option<nemo_flow::api::scope::ScopeHandle>,
254 attrs: LlmAttributes,
255 data: Option<serde_json::Value>,
256 metadata: Option<serde_json::Value>,
257 model_name: Option<String>,
258 codec: Option<Arc<dyn nemo_flow::codec::traits::LlmCodec>>,
259 response_codec: Option<Arc<dyn nemo_flow::codec::traits::LlmResponseCodec>>,
260}
261
262struct RawExecuteInputs {
263 name: *const c_char,
264 native_json: *const c_char,
265 parent: *const FfiScopeHandle,
266 attributes: u32,
267 data_json: *const c_char,
268 metadata_json: *const c_char,
269 model_name: *const c_char,
270 codec_decode: NemoFlowCodecDecodeFn,
271 codec_encode: NemoFlowCodecEncodeFn,
272 codec_user_data: *mut libc::c_void,
273 codec_free_fn: NemoFlowFreeFn,
274 response_codec: *const FfiCodecHandle,
275}
276
277fn parse_llm_request(native_json: *const c_char) -> Result<LlmRequest, NemoFlowStatus> {
278 let native = c_str_to_json(native_json).ok_or(NemoFlowStatus::InvalidJson)?;
279 serde_json::from_value(native).map_err(|_| {
280 set_last_error("failed to parse native_json as LlmRequest");
281 NemoFlowStatus::InvalidJson
282 })
283}
284
285fn parse_optional_model_name(model_name: *const c_char) -> Result<Option<String>, NemoFlowStatus> {
286 if model_name.is_null() {
287 Ok(None)
288 } else {
289 c_str_to_string(model_name).map(Some)
290 }
291}
292
293fn parse_execute_inputs(raw: RawExecuteInputs) -> Result<ParsedExecuteInputs, NemoFlowStatus> {
294 let name = c_str_to_string(raw.name)?;
295 let request = parse_llm_request(raw.native_json)?;
296 let parent_handle = if raw.parent.is_null() {
297 None
298 } else {
299 Some(unsafe { &*raw.parent }.0.clone())
300 };
301 let attrs = LlmAttributes::from_bits_truncate(raw.attributes);
302 let data = c_str_to_opt_json(raw.data_json).ok_or(NemoFlowStatus::InvalidJson)?;
303 let metadata = c_str_to_opt_json(raw.metadata_json).ok_or(NemoFlowStatus::InvalidJson)?;
304 let model_name = parse_optional_model_name(raw.model_name)?;
305 let codec = match (raw.codec_decode, raw.codec_encode) {
306 (Some(decode_cb), Some(encode_cb)) => Some(wrap_codec_fn(
307 decode_cb,
308 encode_cb,
309 raw.codec_user_data,
310 raw.codec_free_fn,
311 )),
312 (None, None) => None,
313 _ => {
314 set_last_error(
315 "codec_decode and codec_encode must either both be provided or both be null",
316 );
317 return Err(NemoFlowStatus::InvalidArg);
318 }
319 };
320 let response_codec = if raw.response_codec.is_null() {
321 None
322 } else {
323 Some(unsafe { &*raw.response_codec }.response_codec.clone())
324 };
325
326 Ok(ParsedExecuteInputs {
327 name,
328 request,
329 parent_handle,
330 attrs,
331 data,
332 metadata,
333 model_name,
334 codec,
335 response_codec,
336 })
337}
338
339#[unsafe(no_mangle)]
364pub unsafe extern "C" fn nemo_flow_llm_call_execute(
365 name: *const c_char,
366 native_json: *const c_char,
367 func: NemoFlowLlmExecCb,
368 func_user_data: *mut libc::c_void,
369 func_free: NemoFlowFreeFn,
370 parent: *const FfiScopeHandle,
371 attributes: u32,
372 data_json: *const c_char,
373 metadata_json: *const c_char,
374 model_name: *const c_char,
375 codec_decode: NemoFlowCodecDecodeFn,
376 codec_encode: NemoFlowCodecEncodeFn,
377 codec_user_data: *mut libc::c_void,
378 codec_free_fn: NemoFlowFreeFn,
379 response_codec: *const FfiCodecHandle,
380 out: *mut *mut c_char,
381) -> NemoFlowStatus {
382 clear_last_error();
383 if out.is_null() {
384 set_last_error("null pointer argument");
385 return NemoFlowStatus::NullPointer;
386 }
387 let parsed = match parse_execute_inputs(RawExecuteInputs {
388 name,
389 native_json,
390 parent,
391 attributes,
392 data_json,
393 metadata_json,
394 model_name,
395 codec_decode,
396 codec_encode,
397 codec_user_data,
398 codec_free_fn,
399 response_codec,
400 }) {
401 Ok(parsed) => parsed,
402 Err(status) => return status,
403 };
404
405 let exec_fn = wrap_llm_exec_fn(func, func_user_data, func_free);
406 let default_fn: LlmExecutionNextFn = Arc::new(move |request| exec_fn(request));
407
408 let scope_stack = current_scope_stack();
409 let result = tokio_runtime().block_on(TASK_SCOPE_STACK.scope(scope_stack, async {
410 core_llm_api::llm_call_execute(
411 core_llm_api::LlmCallExecuteParams::builder()
412 .name(parsed.name)
413 .request(parsed.request)
414 .func(default_fn)
415 .parent_opt(parsed.parent_handle)
416 .attributes(parsed.attrs)
417 .data_opt(parsed.data)
418 .metadata_opt(parsed.metadata)
419 .model_name_opt(parsed.model_name)
420 .codec_opt(parsed.codec)
421 .response_codec_opt(parsed.response_codec)
422 .build(),
423 )
424 .await
425 }));
426
427 match result {
428 Ok(json) => {
429 unsafe { *out = json_to_c_string(&json) };
430 NemoFlowStatus::Ok
431 }
432 Err(e) => status_from_error(&e),
433 }
434}
435
436pub struct FfiStream {
443 pub(crate) receiver:
444 tokio::sync::Mutex<tokio::sync::mpsc::Receiver<FlowResult<serde_json::Value>>>,
445}
446
447#[unsafe(no_mangle)]
474pub unsafe extern "C" fn nemo_flow_llm_stream_call_execute(
475 name: *const c_char,
476 native_json: *const c_char,
477 func: NemoFlowLlmExecCb,
478 func_user_data: *mut libc::c_void,
479 func_free: NemoFlowFreeFn,
480 collector: Option<NemoFlowCollectorCb>,
481 finalizer: Option<NemoFlowFinalizerCb>,
482 parent: *const FfiScopeHandle,
483 attributes: u32,
484 data_json: *const c_char,
485 metadata_json: *const c_char,
486 model_name: *const c_char,
487 codec_decode: NemoFlowCodecDecodeFn,
488 codec_encode: NemoFlowCodecEncodeFn,
489 codec_user_data: *mut libc::c_void,
490 codec_free_fn: NemoFlowFreeFn,
491 response_codec: *const FfiCodecHandle,
492 out: *mut *mut FfiStream,
493) -> NemoFlowStatus {
494 clear_last_error();
495 if out.is_null() {
496 set_last_error("null pointer argument");
497 return NemoFlowStatus::NullPointer;
498 }
499 let parsed = match parse_execute_inputs(RawExecuteInputs {
500 name,
501 native_json,
502 parent,
503 attributes,
504 data_json,
505 metadata_json,
506 model_name,
507 codec_decode,
508 codec_encode,
509 codec_user_data,
510 codec_free_fn,
511 response_codec,
512 }) {
513 Ok(parsed) => parsed,
514 Err(status) => return status,
515 };
516
517 let exec_fn = wrap_llm_stream_exec_fn(func, func_user_data, func_free);
518 let default_fn: LlmStreamExecutionNextFn = Arc::new(move |request| exec_fn(request));
519
520 let wrapped_collector: Box<dyn FnMut(serde_json::Value) -> FlowResult<()> + Send> =
521 match collector {
522 Some(cb) => wrap_collector_fn(cb),
523 None => Box::new(|_: serde_json::Value| Ok(())),
524 };
525
526 let wrapped_finalizer: Box<dyn FnOnce() -> serde_json::Value + Send> = match finalizer {
527 Some(cb) => wrap_finalizer_fn(cb),
528 None => Box::new(|| serde_json::Value::Null),
529 };
530
531 let scope_stack = current_scope_stack();
532 let result = tokio_runtime().block_on(TASK_SCOPE_STACK.scope(scope_stack, async {
533 core_llm_api::llm_stream_call_execute(
534 core_llm_api::LlmStreamCallExecuteParams::builder()
535 .name(parsed.name)
536 .request(parsed.request)
537 .func(default_fn)
538 .collector(wrapped_collector)
539 .finalizer(wrapped_finalizer)
540 .parent_opt(parsed.parent_handle)
541 .attributes(parsed.attrs)
542 .data_opt(parsed.data)
543 .metadata_opt(parsed.metadata)
544 .model_name_opt(parsed.model_name)
545 .codec_opt(parsed.codec)
546 .response_codec_opt(parsed.response_codec)
547 .build(),
548 )
549 .await
550 }));
551
552 match result {
553 Ok(rust_stream) => {
554 let (tx, rx) = tokio::sync::mpsc::channel(32);
555 tokio_runtime().spawn(async move {
556 let mut stream = rust_stream;
557 while let Some(item) = stream.next().await {
558 if tx.send(item).await.is_err() {
559 break;
560 }
561 }
562 });
563 let ffi_stream = Box::new(FfiStream {
564 receiver: tokio::sync::Mutex::new(rx),
565 });
566 unsafe { *out = Box::into_raw(ffi_stream) };
567 NemoFlowStatus::Ok
568 }
569 Err(e) => status_from_error(&e),
570 }
571}
572
573#[unsafe(no_mangle)]
585pub unsafe extern "C" fn nemo_flow_stream_next(
586 stream: *mut FfiStream,
587 out_chunk: *mut *mut c_char,
588) -> i32 {
589 clear_last_error();
590 if stream.is_null() || out_chunk.is_null() {
591 return -1;
592 }
593 let stream = unsafe { &*stream };
594 let result = tokio_runtime().block_on(async {
595 let mut guard = stream.receiver.lock().await;
596 guard.recv().await
597 });
598 match result {
599 None => 0, Some(Ok(chunk)) => {
601 unsafe { *out_chunk = json_to_c_string(&chunk) };
602 1
603 }
604 Some(Err(e)) => {
605 set_last_error(&e.to_string());
606 -1
607 }
608 }
609}
610
611#[unsafe(no_mangle)]
617pub unsafe extern "C" fn nemo_flow_stream_free(stream: *mut FfiStream) {
618 if !stream.is_null() {
619 drop(unsafe { Box::from_raw(stream) });
620 }
621}