#![cfg(feature = "distributed")]
use std::ffi::c_char;
use std::sync::Arc;
use blazen_uniffi::errors::BlazenError as InnerError;
use blazen_uniffi::peer::{PeerClient as InnerPeerClient, PeerServer as InnerPeerServer};
use crate::error::BlazenError;
use crate::future::BlazenFuture;
use crate::runtime::runtime;
use crate::string::{alloc_cstring, cstr_to_str};
use crate::workflow_records::BlazenWorkflowResult;
unsafe fn write_error(out_err: *mut *mut BlazenError, e: InnerError) -> i32 {
if !out_err.is_null() {
unsafe {
*out_err = BlazenError::from(e).into_ptr();
}
}
-1
}
unsafe fn write_internal_error(out_err: *mut *mut BlazenError, msg: &str) -> i32 {
unsafe {
write_error(
out_err,
InnerError::Internal {
message: msg.into(),
},
)
}
}
unsafe fn ptr_array_to_strings(ptrs: *const *const c_char, count: usize) -> Option<Vec<String>> {
if ptrs.is_null() && count > 0 {
return None;
}
let mut out = Vec::with_capacity(count);
for i in 0..count {
let p = unsafe { *ptrs.add(i) };
let s = unsafe { cstr_to_str(p) }?;
out.push(s.to_owned());
}
Some(out)
}
pub struct BlazenPeerServer(pub(crate) Arc<InnerPeerServer>);
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_server_new(node_id: *const c_char) -> *mut BlazenPeerServer {
let Some(node_id) = (unsafe { cstr_to_str(node_id) }) else {
return std::ptr::null_mut();
};
let inner = InnerPeerServer::new(node_id.to_owned());
Box::into_raw(Box::new(BlazenPeerServer(inner)))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_server_serve_blocking(
server: *const BlazenPeerServer,
listen_address: *const c_char,
out_err: *mut *mut BlazenError,
) -> i32 {
if server.is_null() || listen_address.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let server = unsafe { &*server };
let addr = match unsafe { cstr_to_str(listen_address) } {
Some(s) => s.to_owned(),
None => {
return unsafe { write_internal_error(out_err, "listen_address not valid UTF-8") };
}
};
let inner = Arc::clone(&server.0);
match runtime().block_on(async move { inner.serve(addr).await }) {
Ok(()) => 0,
Err(e) => unsafe { write_error(out_err, e) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_server_serve(
server: *const BlazenPeerServer,
listen_address: *const c_char,
) -> *mut BlazenFuture {
if server.is_null() || listen_address.is_null() {
return std::ptr::null_mut();
}
let server = unsafe { &*server };
let addr = match unsafe { cstr_to_str(listen_address) } {
Some(s) => s.to_owned(),
None => return std::ptr::null_mut(),
};
let inner = Arc::clone(&server.0);
BlazenFuture::spawn(async move { inner.serve(addr).await })
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_server_free(server: *mut BlazenPeerServer) {
if server.is_null() {
return;
}
drop(unsafe { Box::from_raw(server) });
}
pub struct BlazenPeerClient(pub(crate) Arc<InnerPeerClient>);
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_client_connect(
address: *const c_char,
client_node_id: *const c_char,
out_client: *mut *mut BlazenPeerClient,
out_err: *mut *mut BlazenError,
) -> i32 {
if address.is_null() || client_node_id.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let address = match unsafe { cstr_to_str(address) } {
Some(s) => s.to_owned(),
None => return unsafe { write_internal_error(out_err, "address not valid UTF-8") },
};
let node_id = match unsafe { cstr_to_str(client_node_id) } {
Some(s) => s.to_owned(),
None => {
return unsafe { write_internal_error(out_err, "client_node_id not valid UTF-8") };
}
};
match InnerPeerClient::connect(address, node_id) {
Ok(client) => {
if !out_client.is_null() {
unsafe {
*out_client = Box::into_raw(Box::new(BlazenPeerClient(client)));
}
}
0
}
Err(e) => unsafe { write_error(out_err, e) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_client_node_id(
client: *const BlazenPeerClient,
) -> *mut c_char {
if client.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let node_id = Arc::clone(&client.0).node_id();
alloc_cstring(&node_id)
}
#[allow(clippy::too_many_arguments)]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_client_run_remote_workflow_blocking(
client: *const BlazenPeerClient,
workflow_name: *const c_char,
step_ids: *const *const c_char,
step_ids_count: usize,
input_json: *const c_char,
timeout_secs: i64,
out_result: *mut *mut BlazenWorkflowResult,
out_err: *mut *mut BlazenError,
) -> i32 {
if client.is_null() || workflow_name.is_null() || input_json.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client = unsafe { &*client };
let workflow_name = match unsafe { cstr_to_str(workflow_name) } {
Some(s) => s.to_owned(),
None => {
return unsafe { write_internal_error(out_err, "workflow_name not valid UTF-8") };
}
};
let Some(step_ids) = (unsafe { ptr_array_to_strings(step_ids, step_ids_count) }) else {
return unsafe {
write_internal_error(out_err, "step_ids contained null or non-UTF-8 entry")
};
};
let input = match unsafe { cstr_to_str(input_json) } {
Some(s) => s.to_owned(),
None => return unsafe { write_internal_error(out_err, "input_json not valid UTF-8") },
};
let timeout = if timeout_secs < 0 {
None
} else {
Some(u64::try_from(timeout_secs).unwrap_or(u64::MAX))
};
let inner = Arc::clone(&client.0);
match runtime().block_on(async move {
inner
.run_remote_workflow(workflow_name, step_ids, input, timeout)
.await
}) {
Ok(result) => {
if !out_result.is_null() {
unsafe {
*out_result = BlazenWorkflowResult::from(result).into_ptr();
}
}
0
}
Err(e) => unsafe { write_error(out_err, e) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_client_run_remote_workflow(
client: *const BlazenPeerClient,
workflow_name: *const c_char,
step_ids: *const *const c_char,
step_ids_count: usize,
input_json: *const c_char,
timeout_secs: i64,
) -> *mut BlazenFuture {
if client.is_null() || workflow_name.is_null() || input_json.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let workflow_name = match unsafe { cstr_to_str(workflow_name) } {
Some(s) => s.to_owned(),
None => return std::ptr::null_mut(),
};
let Some(step_ids) = (unsafe { ptr_array_to_strings(step_ids, step_ids_count) }) else {
return std::ptr::null_mut();
};
let input = match unsafe { cstr_to_str(input_json) } {
Some(s) => s.to_owned(),
None => return std::ptr::null_mut(),
};
let timeout = if timeout_secs < 0 {
None
} else {
Some(u64::try_from(timeout_secs).unwrap_or(u64::MAX))
};
let inner = Arc::clone(&client.0);
BlazenFuture::spawn(async move {
inner
.run_remote_workflow(workflow_name, step_ids, input, timeout)
.await
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_peer_client_free(client: *mut BlazenPeerClient) {
if client.is_null() {
return;
}
drop(unsafe { Box::from_raw(client) });
}