#![allow(clippy::type_complexity)]
use std::ffi::{CStr, CString};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use libc::c_char;
use nemo_flow::api::runtime::{
EventSubscriberFn, LlmConditionalFn, LlmExecutionNextFn, LlmRequestInterceptFn,
LlmStreamExecutionNextFn, ToolConditionalFn, ToolExecutionNextFn, ToolInterceptFn,
};
use serde_json::Value as Json;
use tokio_stream::{Stream, StreamExt};
use nemo_flow::api::event::Event;
use nemo_flow::api::llm::LlmRequest;
use nemo_flow::codec::request::AnnotatedLlmRequest as AnnotatedLLMRequest;
use nemo_flow::codec::traits::LlmCodec;
use nemo_flow::error::{FlowError, Result};
use crate::convert::json_to_c_string;
use crate::error::{NemoFlowStatus, clear_last_error, last_error_message, set_last_error};
use crate::types::{FfiEvent, FfiLLMRequest, FfiPluginContext};
pub type NemoFlowFreeFn = Option<unsafe extern "C" fn(user_data: *mut libc::c_void)>;
pub type NemoFlowToolSanitizeCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
name: *const c_char,
args_json: *const c_char,
) -> *mut c_char;
pub type NemoFlowToolConditionalCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
name: *const c_char,
args_json: *const c_char,
) -> *mut c_char;
pub type NemoFlowToolExecCb =
unsafe extern "C" fn(user_data: *mut libc::c_void, args_json: *const c_char) -> *mut c_char;
pub type NemoFlowToolExecNextFn =
unsafe extern "C" fn(args_json: *const c_char, next_ctx: *mut libc::c_void) -> *mut c_char;
pub type NemoFlowToolExecInterceptCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
args_json: *const c_char,
next_fn: NemoFlowToolExecNextFn,
next_ctx: *mut libc::c_void,
) -> *mut c_char;
pub type NemoFlowJsonCb =
unsafe extern "C" fn(user_data: *mut libc::c_void, json: *const c_char) -> *mut c_char;
pub type NemoFlowLlmRequestCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
request: *const FfiLLMRequest,
) -> *mut FfiLLMRequest;
pub type NemoFlowLlmConditionalCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
request: *const FfiLLMRequest,
) -> *mut c_char;
pub type NemoFlowLlmExecCb =
unsafe extern "C" fn(user_data: *mut libc::c_void, native_json: *const c_char) -> *mut c_char;
pub type NemoFlowLlmExecNextFn =
unsafe extern "C" fn(native_json: *const c_char, next_ctx: *mut libc::c_void) -> *mut c_char;
pub type NemoFlowLlmExecInterceptCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
native_json: *const c_char,
next_fn: NemoFlowLlmExecNextFn,
next_ctx: *mut libc::c_void,
) -> *mut c_char;
pub type NemoFlowEventSubscriberCb =
unsafe extern "C" fn(user_data: *mut libc::c_void, event: *const FfiEvent);
pub type NemoFlowCodecDecodeCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
request: *const FfiLLMRequest,
) -> *mut c_char;
pub type NemoFlowCodecDecodeFn = Option<
unsafe extern "C" fn(
user_data: *mut libc::c_void,
request: *const FfiLLMRequest,
) -> *mut c_char,
>;
pub type NemoFlowCodecEncodeCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
annotated_json: *const c_char,
original_request: *const FfiLLMRequest,
) -> *mut c_char;
pub type NemoFlowCodecEncodeFn = Option<
unsafe extern "C" fn(
user_data: *mut libc::c_void,
annotated_json: *const c_char,
original_request: *const FfiLLMRequest,
) -> *mut c_char,
>;
pub type NemoFlowLlmRequestInterceptCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
name: *const c_char,
request: *const FfiLLMRequest,
annotated_json: *const c_char,
out_request: *mut *mut FfiLLMRequest,
out_annotated_json: *mut *mut c_char,
) -> NemoFlowStatus;
pub type NemoFlowCollectorCb = unsafe extern "C" fn(chunk: *const c_char);
pub type NemoFlowFinalizerCb = unsafe extern "C" fn() -> *mut c_char;
pub type NemoFlowPluginValidateCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
plugin_config_json: *const c_char,
) -> *mut c_char;
pub type NemoFlowPluginRegisterCb = unsafe extern "C" fn(
user_data: *mut libc::c_void,
plugin_config_json: *const c_char,
ctx: *mut FfiPluginContext,
) -> NemoFlowStatus;
struct UserData {
ptr: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
}
unsafe impl Send for UserData {}
unsafe impl Sync for UserData {}
impl Drop for UserData {
fn drop(&mut self) {
if let Some(free) = self.free_fn {
unsafe { free(self.ptr) };
}
}
}
fn make_user_data(
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> std::sync::Arc<UserData> {
std::sync::Arc::new(UserData {
ptr: user_data,
free_fn,
})
}
pub fn wrap_tool_sanitize_fn(
cb: NemoFlowToolSanitizeCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<dyn Fn(&str, Json) -> Json + Send + Sync> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |name: &str, args: Json| {
let c_name = CString::new(name).unwrap_or_default();
let c_args = json_to_c_string(&args);
let result_ptr = unsafe { cb(ud.ptr, c_name.as_ptr(), c_args) };
unsafe { nemo_flow_string_free_internal(c_args) };
let result = ptr_to_json(result_ptr);
unsafe { nemo_flow_string_free_internal(result_ptr) };
result
})
}
pub fn wrap_tool_conditional_fn(
cb: NemoFlowToolConditionalCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> ToolConditionalFn {
let ud = make_user_data(user_data, free_fn);
Box::new(move |name: &str, args: &Json| {
clear_last_error();
let c_name = CString::new(name).unwrap_or_default();
let c_args = json_to_c_string(args);
let result_ptr = unsafe { cb(ud.ptr, c_name.as_ptr(), c_args) };
unsafe { nemo_flow_string_free_internal(c_args) };
let result = if result_ptr.is_null() {
match last_error_message() {
Some(message) => Err(FlowError::Internal(message)),
None => Ok(None),
}
} else {
Ok(ptr_to_opt_string(result_ptr))
};
unsafe { nemo_flow_string_free_internal(result_ptr) };
result
})
}
pub fn wrap_tool_request_intercept_fn(
cb: NemoFlowToolSanitizeCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> ToolInterceptFn {
let ud = make_user_data(user_data, free_fn);
Box::new(move |name: &str, args: Json| {
clear_last_error();
let c_name = CString::new(name).unwrap_or_default();
let c_args = json_to_c_string(&args);
let result_ptr = unsafe { cb(ud.ptr, c_name.as_ptr(), c_args) };
unsafe { nemo_flow_string_free_internal(c_args) };
let result =
json_result_from_ptr(result_ptr, "tool request intercept callback returned null");
unsafe { nemo_flow_string_free_internal(result_ptr) };
result
})
}
pub fn wrap_tool_exec_fn(
cb: NemoFlowToolExecCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<dyn Fn(Json) -> Pin<Box<dyn Future<Output = Result<Json>> + Send>> + Send + Sync> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |args: Json| {
let ud = ud.clone();
Box::pin(async move {
let c_args = json_to_c_string(&args);
let result_ptr = unsafe { cb(ud.ptr, c_args) };
unsafe { nemo_flow_string_free_internal(c_args) };
let result = json_result_from_ptr(result_ptr, "tool execution callback failed")?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
Ok(result)
})
})
}
pub fn wrap_tool_exec_intercept_fn(
cb: NemoFlowToolExecInterceptCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Arc<
dyn Fn(&str, Json, ToolExecutionNextFn) -> Pin<Box<dyn Future<Output = Result<Json>> + Send>>
+ Send
+ Sync,
> {
let ud = make_user_data(user_data, free_fn);
Arc::new(move |_name: &str, args: Json, next: ToolExecutionNextFn| {
let ud = ud.clone();
Box::pin(async move {
let next_box = Box::new(next);
let next_ctx = Box::into_raw(next_box) as *mut libc::c_void;
unsafe extern "C" fn tool_next_trampoline(
args_json: *const c_char,
next_ctx: *mut libc::c_void,
) -> *mut c_char {
let next_arc = unsafe { &*(next_ctx as *const ToolExecutionNextFn) };
let next = next_arc.clone();
let args = if args_json.is_null() {
Json::Null
} else {
let s = unsafe { CStr::from_ptr(args_json) }.to_string_lossy();
serde_json::from_str(&s).unwrap_or(Json::Null)
};
let handle = tokio::runtime::Handle::current();
let result = tokio::task::block_in_place(|| handle.block_on(next(args)));
match result {
Ok(json) => json_to_c_string(&json),
Err(e) => {
set_last_error(&e.to_string());
std::ptr::null_mut()
}
}
}
let c_args = json_to_c_string(&args);
let result_ptr = unsafe { cb(ud.ptr, c_args, tool_next_trampoline, next_ctx) };
unsafe { drop(Box::from_raw(next_ctx as *mut ToolExecutionNextFn)) };
unsafe { nemo_flow_string_free_internal(c_args) };
let result =
json_result_from_ptr(result_ptr, "tool execution intercept callback failed")?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
Ok(result)
})
})
}
pub fn wrap_llm_exec_intercept_fn(
cb: NemoFlowLlmExecInterceptCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Arc<
dyn Fn(
&str,
LlmRequest,
LlmExecutionNextFn,
) -> Pin<Box<dyn Future<Output = Result<Json>> + Send>>
+ Send
+ Sync,
> {
let ud = make_user_data(user_data, free_fn);
Arc::new(
move |_name: &str, request: LlmRequest, next: LlmExecutionNextFn| {
let ud = ud.clone();
Box::pin(async move {
let next_box = Box::new(next);
let next_ctx = Box::into_raw(next_box) as *mut libc::c_void;
unsafe extern "C" fn llm_next_trampoline(
native_json: *const c_char,
next_ctx: *mut libc::c_void,
) -> *mut c_char {
let next_arc = unsafe { &*(next_ctx as *const LlmExecutionNextFn) };
let next = next_arc.clone();
let request = if native_json.is_null() {
LlmRequest {
headers: serde_json::Map::new(),
content: Json::Null,
}
} else {
let s = unsafe { CStr::from_ptr(native_json) }.to_string_lossy();
serde_json::from_str::<LlmRequest>(&s).unwrap_or(LlmRequest {
headers: serde_json::Map::new(),
content: Json::Null,
})
};
let handle = tokio::runtime::Handle::current();
let result = tokio::task::block_in_place(|| handle.block_on(next(request)));
match result {
Ok(json) => json_to_c_string(&json),
Err(e) => {
set_last_error(&e.to_string());
std::ptr::null_mut()
}
}
}
let request_json = serde_json::to_value(&request).unwrap_or(Json::Null);
let c_request = json_to_c_string(&request_json);
let result_ptr = unsafe { cb(ud.ptr, c_request, llm_next_trampoline, next_ctx) };
unsafe { drop(Box::from_raw(next_ctx as *mut LlmExecutionNextFn)) };
unsafe { nemo_flow_string_free_internal(c_request) };
let result =
json_result_from_ptr(result_ptr, "LLM execution intercept callback failed")?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
Ok(result)
})
},
)
}
pub fn wrap_llm_stream_exec_intercept_fn(
cb: NemoFlowLlmExecInterceptCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Arc<
dyn Fn(
&str,
LlmRequest,
LlmStreamExecutionNextFn,
) -> Pin<
Box<
dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Result<Json>> + Send>>>>
+ Send,
>,
> + Send
+ Sync,
> {
let ud = make_user_data(user_data, free_fn);
Arc::new(
move |_name: &str, request: LlmRequest, next: LlmStreamExecutionNextFn| {
let ud = ud.clone();
Box::pin(async move {
let next_box = Box::new(next);
let next_ctx = Box::into_raw(next_box) as *mut libc::c_void;
unsafe extern "C" fn llm_stream_next_trampoline(
native_json: *const c_char,
next_ctx: *mut libc::c_void,
) -> *mut c_char {
let next_arc = unsafe { &*(next_ctx as *const LlmStreamExecutionNextFn) };
let next = next_arc.clone();
let request = if native_json.is_null() {
LlmRequest {
headers: serde_json::Map::new(),
content: Json::Null,
}
} else {
let s = unsafe { CStr::from_ptr(native_json) }.to_string_lossy();
serde_json::from_str::<LlmRequest>(&s).unwrap_or(LlmRequest {
headers: serde_json::Map::new(),
content: Json::Null,
})
};
let handle = tokio::runtime::Handle::current();
let result = tokio::task::block_in_place(|| {
handle.block_on(async move {
let mut stream = next(request).await?;
match stream.next().await {
Some(item) => item,
None => Ok(Json::Null),
}
})
});
match result {
Ok(json) => json_to_c_string(&json),
Err(e) => {
set_last_error(&e.to_string());
std::ptr::null_mut()
}
}
}
let request_json = serde_json::to_value(&request).unwrap_or(Json::Null);
let c_request = json_to_c_string(&request_json);
let result_ptr =
unsafe { cb(ud.ptr, c_request, llm_stream_next_trampoline, next_ctx) };
unsafe { drop(Box::from_raw(next_ctx as *mut LlmStreamExecutionNextFn)) };
unsafe { nemo_flow_string_free_internal(c_request) };
let result = json_result_from_ptr(
result_ptr,
"LLM stream execution intercept callback failed",
)?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
let stream = tokio_stream::once(Ok(result));
Ok(Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Json>> + Send>>)
})
},
)
}
pub fn wrap_json_fn(
cb: NemoFlowJsonCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<dyn Fn(Json) -> Json + Send + Sync> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |value: Json| {
let c_json = json_to_c_string(&value);
let result_ptr = unsafe { cb(ud.ptr, c_json) };
unsafe { nemo_flow_string_free_internal(c_json) };
let result = ptr_to_json(result_ptr);
unsafe { nemo_flow_string_free_internal(result_ptr) };
result
})
}
pub fn wrap_llm_request_intercept_fn(
cb: NemoFlowLlmRequestInterceptCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> LlmRequestInterceptFn {
let ud = make_user_data(user_data, free_fn);
Box::new(
move |name: &str, request: LlmRequest, annotated: Option<AnnotatedLLMRequest>| {
clear_last_error();
let c_name = CString::new(name).unwrap_or_default();
let ffi_req = Box::into_raw(Box::new(FfiLLMRequest(request)));
let c_annotated = match &annotated {
Some(a) => {
let s = serde_json::to_string(a).unwrap_or_else(|_| "null".to_string());
CString::new(s).unwrap_or_default()
}
None => CString::default(),
};
let annotated_ptr = if annotated.is_some() {
c_annotated.as_ptr()
} else {
std::ptr::null()
};
let mut out_request: *mut FfiLLMRequest = std::ptr::null_mut();
let mut out_annotated: *mut c_char = std::ptr::null_mut();
let status = unsafe {
cb(
ud.ptr,
c_name.as_ptr(),
ffi_req,
annotated_ptr,
&mut out_request,
&mut out_annotated,
)
};
unsafe { drop(Box::from_raw(ffi_req)) };
if status != NemoFlowStatus::Ok {
let message = last_error_message()
.unwrap_or_else(|| "request intercept callback failed".to_string());
return Err(FlowError::Internal(message));
}
let new_request = if out_request.is_null() {
return Err(FlowError::Internal(
"request intercept returned null out_request".to_string(),
));
} else {
let boxed = unsafe { Box::from_raw(out_request) };
boxed.0
};
let new_annotated = if out_annotated.is_null() {
None
} else {
let s = unsafe { CStr::from_ptr(out_annotated) }.to_string_lossy();
let parsed: Option<AnnotatedLLMRequest> = serde_json::from_str(&s).ok();
unsafe { nemo_flow_string_free_internal(out_annotated) };
parsed
};
Ok((new_request, new_annotated))
},
)
}
pub fn wrap_llm_response_fn(
cb: NemoFlowJsonCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<dyn Fn(Json) -> Json + Send + Sync> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |response: Json| {
let c_json = json_to_c_string(&response);
let result_ptr = unsafe { cb(ud.ptr, c_json) };
unsafe { nemo_flow_string_free_internal(c_json) };
let result_json = ptr_to_json(result_ptr);
unsafe { nemo_flow_string_free_internal(result_ptr) };
result_json
})
}
pub fn wrap_llm_sanitize_request_fn(
cb: NemoFlowLlmRequestCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<dyn Fn(LlmRequest) -> LlmRequest + Send + Sync> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |request: LlmRequest| {
let ffi_req = Box::into_raw(Box::new(FfiLLMRequest(request)));
let result_ptr = unsafe { cb(ud.ptr, ffi_req) };
unsafe { drop(Box::from_raw(ffi_req)) };
if result_ptr.is_null() {
LlmRequest {
headers: serde_json::Map::new(),
content: Json::Null,
}
} else {
let result = unsafe { Box::from_raw(result_ptr) };
result.0
}
})
}
pub fn wrap_llm_conditional_fn(
cb: NemoFlowLlmConditionalCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> LlmConditionalFn {
let ud = make_user_data(user_data, free_fn);
Box::new(move |request: &LlmRequest| {
clear_last_error();
let ffi_req = FfiLLMRequest(request.clone());
let result_ptr = unsafe { cb(ud.ptr, &ffi_req) };
let result = if result_ptr.is_null() {
match last_error_message() {
Some(message) => Err(FlowError::Internal(message)),
None => Ok(None),
}
} else {
Ok(ptr_to_opt_string(result_ptr))
};
unsafe { nemo_flow_string_free_internal(result_ptr) };
result
})
}
pub fn wrap_llm_exec_fn(
cb: NemoFlowLlmExecCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<dyn Fn(LlmRequest) -> Pin<Box<dyn Future<Output = Result<Json>> + Send>> + Send + Sync> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |request: LlmRequest| {
let ud = ud.clone();
Box::pin(async move {
let request_json = serde_json::to_value(&request).unwrap_or(Json::Null);
let c_request = json_to_c_string(&request_json);
let result_ptr = unsafe { cb(ud.ptr, c_request) };
unsafe { nemo_flow_string_free_internal(c_request) };
let result = json_result_from_ptr(result_ptr, "LLM execution callback failed")?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
Ok(result)
})
})
}
pub fn wrap_llm_stream_exec_fn(
cb: NemoFlowLlmExecCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Box<
dyn Fn(
LlmRequest,
) -> Pin<
Box<
dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Result<Json>> + Send>>>>
+ Send,
>,
> + Send
+ Sync,
> {
let ud = make_user_data(user_data, free_fn);
Box::new(move |request: LlmRequest| {
let ud = ud.clone();
Box::pin(async move {
let request_json = serde_json::to_value(&request).unwrap_or(Json::Null);
let c_request = json_to_c_string(&request_json);
let result_ptr = unsafe { cb(ud.ptr, c_request) };
unsafe { nemo_flow_string_free_internal(c_request) };
let result = json_result_from_ptr(result_ptr, "LLM stream execution callback failed")?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
let stream = tokio_stream::once(Ok(result));
Ok(Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Json>> + Send>>)
})
})
}
pub fn wrap_collector_fn(cb: NemoFlowCollectorCb) -> Box<dyn FnMut(Json) -> Result<()> + Send> {
Box::new(move |chunk: Json| {
let c_chunk = json_to_c_string(&chunk);
unsafe { cb(c_chunk) };
unsafe { nemo_flow_string_free_internal(c_chunk) };
Ok(())
})
}
pub fn wrap_finalizer_fn(cb: NemoFlowFinalizerCb) -> Box<dyn FnOnce() -> Json + Send> {
Box::new(move || {
let result_ptr = unsafe { cb() };
let result = ptr_to_json(result_ptr);
unsafe { nemo_flow_string_free_internal(result_ptr) };
result
})
}
pub fn wrap_event_subscriber(
cb: NemoFlowEventSubscriberCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> EventSubscriberFn {
let ud = make_user_data(user_data, free_fn);
Arc::new(move |event: &Event| {
let ffi_event = FfiEvent(event.clone());
unsafe { cb(ud.ptr, &ffi_event) };
})
}
struct FfiCodec {
decode_cb: NemoFlowCodecDecodeCb,
encode_cb: NemoFlowCodecEncodeCb,
user_data: Arc<UserData>,
}
unsafe impl Send for FfiCodec {}
unsafe impl Sync for FfiCodec {}
impl LlmCodec for FfiCodec {
fn decode(&self, request: &LlmRequest) -> Result<AnnotatedLLMRequest> {
clear_last_error();
let ffi_req = Box::into_raw(Box::new(FfiLLMRequest(request.clone())));
let result_ptr = unsafe { (self.decode_cb)(self.user_data.ptr, ffi_req) };
unsafe { drop(Box::from_raw(ffi_req)) };
if result_ptr.is_null() {
let message = last_error_message()
.unwrap_or_else(|| "codec decode callback returned null".to_string());
return Err(FlowError::Internal(message));
}
let result_str = unsafe { CStr::from_ptr(result_ptr) }.to_string_lossy();
let annotated: AnnotatedLLMRequest = serde_json::from_str(&result_str).map_err(|e| {
unsafe { nemo_flow_string_free_internal(result_ptr) };
FlowError::Internal(format!("codec decode: invalid JSON: {e}"))
})?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
Ok(annotated)
}
fn encode(&self, annotated: &AnnotatedLLMRequest, original: &LlmRequest) -> Result<LlmRequest> {
clear_last_error();
let annotated_str = serde_json::to_string(annotated)
.map_err(|e| FlowError::Internal(format!("codec encode: serialize failed: {e}")))?;
let c_annotated = CString::new(annotated_str)
.map_err(|e| FlowError::Internal(format!("codec encode: CString failed: {e}")))?;
let ffi_req = Box::into_raw(Box::new(FfiLLMRequest(original.clone())));
let result_ptr =
unsafe { (self.encode_cb)(self.user_data.ptr, c_annotated.as_ptr(), ffi_req) };
unsafe { drop(Box::from_raw(ffi_req)) };
if result_ptr.is_null() {
let message = last_error_message()
.unwrap_or_else(|| "codec encode callback returned null".to_string());
return Err(FlowError::Internal(message));
}
let result_str = unsafe { CStr::from_ptr(result_ptr) }.to_string_lossy();
let content: serde_json::Value = serde_json::from_str(&result_str).map_err(|e| {
unsafe { nemo_flow_string_free_internal(result_ptr) };
FlowError::Internal(format!("codec encode: invalid result JSON: {e}"))
})?;
unsafe { nemo_flow_string_free_internal(result_ptr) };
Ok(LlmRequest {
headers: original.headers.clone(),
content,
})
}
}
pub fn wrap_codec_fn(
decode_cb: NemoFlowCodecDecodeCb,
encode_cb: NemoFlowCodecEncodeCb,
user_data: *mut libc::c_void,
free_fn: NemoFlowFreeFn,
) -> Arc<dyn LlmCodec> {
let ud = make_user_data(user_data, free_fn);
Arc::new(FfiCodec {
decode_cb,
encode_cb,
user_data: ud,
})
}
fn ptr_to_json(ptr: *mut c_char) -> Json {
if ptr.is_null() {
return Json::Null;
}
let s = unsafe { CStr::from_ptr(ptr) }.to_string_lossy();
serde_json::from_str(&s).unwrap_or(Json::Null)
}
fn json_result_from_ptr(ptr: *mut c_char, fallback: &str) -> Result<Json> {
if ptr.is_null() {
let message = last_error_message().unwrap_or_else(|| fallback.to_string());
return Err(FlowError::Internal(message));
}
Ok(ptr_to_json(ptr))
}
fn ptr_to_opt_string(ptr: *mut c_char) -> Option<String> {
if ptr.is_null() {
return None;
}
Some(
unsafe { CStr::from_ptr(ptr) }
.to_string_lossy()
.into_owned(),
)
}
unsafe fn nemo_flow_string_free_internal(ptr: *mut c_char) {
if !ptr.is_null() {
drop(unsafe { CString::from_raw(ptr) });
}
}
#[cfg(test)]
#[path = "../tests/unit/callable_tests.rs"]
mod tests;
#[cfg(test)]
#[path = "../tests/unit/callable_private_tests.rs"]
mod private_tests;