1use std::ffi::CStr;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14use std::time::Duration;
15
16use crate::callable::{
17 NemoFlowCodecDecodeFn, NemoFlowCodecEncodeFn, NemoFlowCollectorCb, NemoFlowEventSubscriberCb,
18 NemoFlowFinalizerCb, NemoFlowFreeFn, NemoFlowJsonCb, NemoFlowLlmConditionalCb,
19 NemoFlowLlmExecCb, NemoFlowLlmExecInterceptCb, NemoFlowLlmRequestCb,
20 NemoFlowLlmRequestInterceptCb, NemoFlowPluginRegisterCb, NemoFlowPluginValidateCb,
21 NemoFlowToolConditionalCb, NemoFlowToolExecCb, NemoFlowToolExecInterceptCb,
22 NemoFlowToolSanitizeCb, wrap_codec_fn, wrap_collector_fn, wrap_event_subscriber,
23 wrap_finalizer_fn, wrap_llm_conditional_fn, wrap_llm_exec_fn, wrap_llm_exec_intercept_fn,
24 wrap_llm_request_intercept_fn, wrap_llm_response_fn, wrap_llm_sanitize_request_fn,
25 wrap_llm_stream_exec_fn, wrap_llm_stream_exec_intercept_fn, wrap_tool_conditional_fn,
26 wrap_tool_exec_fn, wrap_tool_exec_intercept_fn, wrap_tool_request_intercept_fn,
27 wrap_tool_sanitize_fn,
28};
29use crate::convert::{
30 c_str_to_json, c_str_to_opt_json, c_str_to_string, json_to_c_string, nemo_flow_string_free,
31 str_to_c_string, unix_micros_to_opt_timestamp,
32};
33use crate::error::{
34 NemoFlowStatus, clear_last_error, last_error_message, set_last_error, status_from_error,
35 status_from_plugin_error,
36};
37use crate::types::{
38 FfiAtifExporter, FfiAtofExporter, FfiCodecHandle, FfiLLMHandle, FfiOpenInferenceSubscriber,
39 FfiOpenTelemetrySubscriber, FfiPluginContext, FfiScopeHandle, FfiScopeStack,
40 FfiThreadScopeStackBinding, FfiToolHandle, NemoFlowScopeType,
41};
42pub use crate::types::{nemo_flow_openinference_subscriber_free, nemo_flow_otel_subscriber_free};
43use libc::c_char;
44use nemo_flow::api::llm as core_llm_api;
45use nemo_flow::api::llm::{LlmAttributes, LlmRequest};
46use nemo_flow::api::registry as core_registry_api;
47use nemo_flow::api::runtime::{LlmExecutionNextFn, LlmStreamExecutionNextFn, ToolExecutionNextFn};
48use nemo_flow::api::runtime::{
49 TASK_SCOPE_STACK, capture_thread_scope_stack, create_scope_stack, current_scope_stack,
50 restore_thread_scope_stack, scope_stack_active, set_thread_scope_stack,
51};
52use nemo_flow::api::scope as core_scope_api;
53use nemo_flow::api::scope::ScopeAttributes;
54use nemo_flow::api::subscriber as core_subscriber_api;
55use nemo_flow::api::tool as core_tool_api;
56use nemo_flow::api::tool::ToolAttributes;
57use nemo_flow::error::Result as FlowResult;
58use nemo_flow::plugin::{
59 ConfigDiagnostic, DiagnosticLevel, Plugin, PluginConfig, PluginError,
60 PluginRegistrationContext, active_plugin_report, clear_plugin_configuration, deregister_plugin,
61 initialize_plugins, list_plugin_kinds, register_plugin, validate_plugin_config,
62};
63use nemo_flow_adaptive::plugin_component::register_adaptive_component;
64use tokio::runtime::Runtime;
65
66mod llm;
67mod llm_registry;
68mod observability;
69mod plugin;
70mod scope;
71mod scope_registry;
72mod scope_stack;
73mod tool_lifecycle;
74mod tool_registry;
75
76pub use llm::*;
77pub use llm_registry::*;
78pub use observability::*;
79pub use plugin::*;
80pub use scope::*;
81pub use scope_registry::*;
82pub use scope_stack::*;
83pub use tool_lifecycle::*;
84pub use tool_registry::*;
85
86fn tokio_runtime() -> &'static Runtime {
87 static RT: OnceLock<Runtime> = OnceLock::new();
88 RT.get_or_init(|| {
89 tokio::runtime::Builder::new_multi_thread()
90 .enable_all()
91 .build()
92 .expect("Failed to create tokio runtime")
93 })
94}
95
96#[unsafe(no_mangle)]
118pub unsafe extern "C" fn nemo_flow_tool_request_intercepts(
119 name: *const c_char,
120 args_json: *const c_char,
121 out: *mut *mut c_char,
122) -> NemoFlowStatus {
123 clear_last_error();
124 let name = match c_str_to_string(name) {
125 Ok(s) => s,
126 Err(status) => return status,
127 };
128 let args = match c_str_to_json(args_json) {
129 Some(a) => a,
130 None => return NemoFlowStatus::InvalidJson,
131 };
132 match core_tool_api::tool_request_intercepts(&name, args) {
133 Ok(result) => {
134 unsafe { *out = json_to_c_string(&result) };
135 NemoFlowStatus::Ok
136 }
137 Err(e) => status_from_error(&e),
138 }
139}
140
141#[unsafe(no_mangle)]
157pub unsafe extern "C" fn nemo_flow_tool_conditional_execution(
158 name: *const c_char,
159 args_json: *const c_char,
160) -> NemoFlowStatus {
161 clear_last_error();
162 let name = match c_str_to_string(name) {
163 Ok(s) => s,
164 Err(status) => return status,
165 };
166 let args = match c_str_to_json(args_json) {
167 Some(a) => a,
168 None => return NemoFlowStatus::InvalidJson,
169 };
170 match core_tool_api::tool_conditional_execution(&name, &args) {
171 Ok(()) => NemoFlowStatus::Ok,
172 Err(e) => status_from_error(&e),
173 }
174}
175
176#[unsafe(no_mangle)]
196pub unsafe extern "C" fn nemo_flow_llm_request_intercepts(
197 name: *const c_char,
198 native_json: *const c_char,
199 out: *mut *mut c_char,
200) -> NemoFlowStatus {
201 clear_last_error();
202 let name_str = if name.is_null() {
203 ""
204 } else {
205 unsafe { CStr::from_ptr(name) }.to_str().unwrap_or_default()
206 };
207 let native = match c_str_to_json(native_json) {
208 Some(j) => j,
209 None => return NemoFlowStatus::InvalidJson,
210 };
211 let request: LlmRequest = match serde_json::from_value(native) {
212 Ok(r) => r,
213 Err(_) => {
214 set_last_error("failed to parse native_json as LlmRequest");
215 return NemoFlowStatus::InvalidJson;
216 }
217 };
218 match core_llm_api::llm_request_intercepts(name_str, request) {
219 Ok(transformed) => {
220 let result_json = serde_json::to_value(&transformed).unwrap_or(serde_json::Value::Null);
221 unsafe { *out = json_to_c_string(&result_json) };
222 NemoFlowStatus::Ok
223 }
224 Err(e) => status_from_error(&e),
225 }
226}
227
228#[unsafe(no_mangle)]
244pub unsafe extern "C" fn nemo_flow_llm_conditional_execution(
245 native_json: *const c_char,
246) -> NemoFlowStatus {
247 clear_last_error();
248 let native = match c_str_to_json(native_json) {
249 Some(j) => j,
250 None => return NemoFlowStatus::InvalidJson,
251 };
252 let request: LlmRequest = match serde_json::from_value(native) {
253 Ok(r) => r,
254 Err(_) => {
255 set_last_error("failed to parse native_json as LlmRequest");
256 return NemoFlowStatus::InvalidJson;
257 }
258 };
259 match core_llm_api::llm_conditional_execution(&request) {
260 Ok(()) => NemoFlowStatus::Ok,
261 Err(e) => status_from_error(&e),
262 }
263}
264
265#[cfg(test)]
266#[path = "../../tests/unit/api_tests.rs"]
267mod tests;