use std::ffi::{CString, c_char};
use std::os::raw::c_void;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use blazen_controlplane::protocol::Assignment;
use blazen_controlplane::worker::{AssignmentContext, AssignmentFailure};
use blazen_controlplane::{
AssignmentHandler, Client as InnerClient, Worker as InnerWorker,
WorkerConfig as InnerWorkerConfig,
};
use blazen_core::distributed::{
AdmissionMode, OrchestratorClient, SubmitWorkflowRequest, WorkerCapability,
};
use futures_util::StreamExt;
use parking_lot::Mutex;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use blazen_uniffi::errors::BlazenError as InnerError;
use crate::controlplane_records::{BlazenRunStateSnapshot, BlazenWorkerInfoList};
use crate::error::BlazenError;
use crate::future::BlazenFuture;
use crate::runtime::runtime;
use crate::string::cstr_to_str;
unsafe fn write_error(out_err: *mut *mut BlazenError, e: InnerError) -> i32 {
if !out_err.is_null() {
unsafe {
*out_err = BlazenError::from(e).into_ptr();
}
}
-1
}
unsafe fn write_internal_error(out_err: *mut *mut BlazenError, msg: &str) -> i32 {
unsafe {
write_error(
out_err,
InnerError::Internal {
message: msg.into(),
},
)
}
}
unsafe fn parse_json_string_array(ptr: *const c_char) -> Option<Vec<String>> {
if ptr.is_null() {
return Some(Vec::new());
}
let s = unsafe { cstr_to_str(ptr) }?;
let value: serde_json::Value = serde_json::from_str(s).ok()?;
match value {
serde_json::Value::Array(items) => {
let mut out = Vec::with_capacity(items.len());
for item in items {
let serde_json::Value::String(s) = item else {
return None;
};
out.push(s);
}
Some(out)
}
serde_json::Value::Null => Some(Vec::new()),
_ => None,
}
}
unsafe fn parse_json_capabilities(ptr: *const c_char) -> Option<Vec<WorkerCapability>> {
if ptr.is_null() {
return Some(Vec::new());
}
let s = unsafe { cstr_to_str(ptr) }?;
let trimmed = s.trim();
if trimmed.is_empty() {
return Some(Vec::new());
}
let value: serde_json::Value = serde_json::from_str(s).ok()?;
let items = match value {
serde_json::Value::Array(items) => items,
serde_json::Value::Null => return Some(Vec::new()),
_ => return None,
};
let mut out = Vec::with_capacity(items.len());
for item in items {
let obj = item.as_object()?;
let kind = obj.get("kind")?.as_str()?.to_owned();
let version = obj.get("version").and_then(serde_json::Value::as_u64)?;
let version = u32::try_from(version).ok()?;
out.push(WorkerCapability { kind, version });
}
Some(out)
}
unsafe fn parse_uuid(ptr: *const c_char) -> Option<Uuid> {
let s = unsafe { cstr_to_str(ptr) }?;
Uuid::parse_str(s).ok()
}
pub struct BlazenControlPlaneClient {
inner: InnerClient,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_connect_blocking(
endpoint: *const c_char,
out_client: *mut *mut BlazenControlPlaneClient,
out_err: *mut *mut BlazenError,
) -> i32 {
if endpoint.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let endpoint = match unsafe { cstr_to_str(endpoint) } {
Some(s) => s.to_owned(),
None => return unsafe { write_internal_error(out_err, "endpoint not valid UTF-8") },
};
match runtime().block_on(async move { InnerClient::connect(endpoint, None).await }) {
Ok(client) => {
if !out_client.is_null() {
unsafe {
*out_client =
Box::into_raw(Box::new(BlazenControlPlaneClient { inner: client }));
}
}
0
}
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_connect(
endpoint: *const c_char,
) -> *mut BlazenFuture {
if endpoint.is_null() {
return std::ptr::null_mut();
}
let endpoint = match unsafe { cstr_to_str(endpoint) } {
Some(s) => s.to_owned(),
None => return std::ptr::null_mut(),
};
BlazenFuture::spawn(async move {
InnerClient::connect(endpoint, None)
.await
.map(|client| BlazenControlPlaneClient { inner: client })
.map_err(InnerError::from)
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_free(client: *mut BlazenControlPlaneClient) {
if client.is_null() {
return;
}
drop(unsafe { Box::from_raw(client) });
}
fn build_submit_request(
workflow_name: String,
input_json: Option<&str>,
required_tags: Vec<String>,
wait_for_worker: bool,
) -> Result<SubmitWorkflowRequest, String> {
let input = match input_json {
Some(s) => {
serde_json::from_str(s).map_err(|e| format!("input_json not valid JSON: {e}"))?
}
None => serde_json::Value::Null,
};
Ok(SubmitWorkflowRequest {
workflow_name,
workflow_version: None,
input,
required_tags,
idempotency_key: None,
deadline_ms: None,
wait_for_worker,
resource_hint: None,
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_submit_workflow_blocking(
client: *const BlazenControlPlaneClient,
workflow_name: *const c_char,
input_json: *const c_char,
required_tags_json: *const c_char,
wait_for_worker: bool,
out_snapshot: *mut *mut BlazenRunStateSnapshot,
out_err: *mut *mut BlazenError,
) -> i32 {
if client.is_null() || workflow_name.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client = unsafe { &*client };
let workflow_name = match unsafe { cstr_to_str(workflow_name) } {
Some(s) => s.to_owned(),
None => {
return unsafe { write_internal_error(out_err, "workflow_name not valid UTF-8") };
}
};
let input_borrow = unsafe { cstr_to_str(input_json) };
let Some(required_tags) = (unsafe { parse_json_string_array(required_tags_json) }) else {
return unsafe {
write_internal_error(out_err, "required_tags_json is not a JSON string array")
};
};
let request =
match build_submit_request(workflow_name, input_borrow, required_tags, wait_for_worker) {
Ok(r) => r,
Err(msg) => return unsafe { write_internal_error(out_err, &msg) },
};
let inner = client.inner.clone();
match runtime().block_on(async move { inner.submit_workflow(request).await }) {
Ok(snap) => {
if !out_snapshot.is_null() {
unsafe {
*out_snapshot = BlazenRunStateSnapshot::from(snap).into_ptr();
}
}
0
}
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_submit_workflow(
client: *const BlazenControlPlaneClient,
workflow_name: *const c_char,
input_json: *const c_char,
required_tags_json: *const c_char,
wait_for_worker: bool,
) -> *mut BlazenFuture {
if client.is_null() || workflow_name.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let workflow_name = match unsafe { cstr_to_str(workflow_name) } {
Some(s) => s.to_owned(),
None => return std::ptr::null_mut(),
};
let input_owned = unsafe { cstr_to_str(input_json) }.map(str::to_owned);
let Some(required_tags) = (unsafe { parse_json_string_array(required_tags_json) }) else {
return std::ptr::null_mut();
};
let inner = client.inner.clone();
BlazenFuture::spawn(async move {
let request = build_submit_request(
workflow_name,
input_owned.as_deref(),
required_tags,
wait_for_worker,
)
.map_err(|message| InnerError::Internal { message })?;
let snap = inner
.submit_workflow(request)
.await
.map_err(InnerError::from)?;
Ok(snap)
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_cancel_workflow_blocking(
client: *const BlazenControlPlaneClient,
run_id: *const c_char,
out_snapshot: *mut *mut BlazenRunStateSnapshot,
out_err: *mut *mut BlazenError,
) -> i32 {
if client.is_null() || run_id.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client = unsafe { &*client };
let Some(uuid) = (unsafe { parse_uuid(run_id) }) else {
return unsafe { write_internal_error(out_err, "run_id is not a valid UUID") };
};
let inner = client.inner.clone();
match runtime().block_on(async move { inner.cancel_workflow(uuid).await }) {
Ok(snap) => {
if !out_snapshot.is_null() {
unsafe {
*out_snapshot = BlazenRunStateSnapshot::from(snap).into_ptr();
}
}
0
}
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_cancel_workflow(
client: *const BlazenControlPlaneClient,
run_id: *const c_char,
) -> *mut BlazenFuture {
if client.is_null() || run_id.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let Some(uuid) = (unsafe { parse_uuid(run_id) }) else {
return std::ptr::null_mut();
};
let inner = client.inner.clone();
BlazenFuture::spawn(async move { inner.cancel_workflow(uuid).await.map_err(InnerError::from) })
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_describe_workflow_blocking(
client: *const BlazenControlPlaneClient,
run_id: *const c_char,
out_snapshot: *mut *mut BlazenRunStateSnapshot,
out_err: *mut *mut BlazenError,
) -> i32 {
if client.is_null() || run_id.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client = unsafe { &*client };
let Some(uuid) = (unsafe { parse_uuid(run_id) }) else {
return unsafe { write_internal_error(out_err, "run_id is not a valid UUID") };
};
let inner = client.inner.clone();
match runtime().block_on(async move { inner.describe_workflow(uuid).await }) {
Ok(snap) => {
if !out_snapshot.is_null() {
unsafe {
*out_snapshot = BlazenRunStateSnapshot::from(snap).into_ptr();
}
}
0
}
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_describe_workflow(
client: *const BlazenControlPlaneClient,
run_id: *const c_char,
) -> *mut BlazenFuture {
if client.is_null() || run_id.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let Some(uuid) = (unsafe { parse_uuid(run_id) }) else {
return std::ptr::null_mut();
};
let inner = client.inner.clone();
BlazenFuture::spawn(async move {
inner
.describe_workflow(uuid)
.await
.map_err(InnerError::from)
})
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_list_workers_blocking(
client: *const BlazenControlPlaneClient,
out_list: *mut *mut BlazenWorkerInfoList,
out_err: *mut *mut BlazenError,
) -> i32 {
if client.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client = unsafe { &*client };
let inner = client.inner.clone();
match runtime().block_on(async move { inner.list_workers().await }) {
Ok(workers) => {
if !out_list.is_null() {
unsafe {
*out_list = BlazenWorkerInfoList::from(workers).into_ptr();
}
}
0
}
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_list_workers(
client: *const BlazenControlPlaneClient,
) -> *mut BlazenFuture {
if client.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let inner = client.inner.clone();
BlazenFuture::spawn(async move { inner.list_workers().await.map_err(InnerError::from) })
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_drain_worker_blocking(
client: *const BlazenControlPlaneClient,
node_id: *const c_char,
immediate: bool,
out_err: *mut *mut BlazenError,
) -> i32 {
if client.is_null() || node_id.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client = unsafe { &*client };
let node_id = match unsafe { cstr_to_str(node_id) } {
Some(s) => s.to_owned(),
None => return unsafe { write_internal_error(out_err, "node_id not valid UTF-8") },
};
let inner = client.inner.clone();
match runtime().block_on(async move { inner.drain_worker(node_id, immediate).await }) {
Ok(()) => 0,
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_drain_worker(
client: *const BlazenControlPlaneClient,
node_id: *const c_char,
immediate: bool,
) -> *mut BlazenFuture {
if client.is_null() || node_id.is_null() {
return std::ptr::null_mut();
}
let client = unsafe { &*client };
let node_id = match unsafe { cstr_to_str(node_id) } {
Some(s) => s.to_owned(),
None => return std::ptr::null_mut(),
};
let inner = client.inner.clone();
BlazenFuture::spawn(async move {
inner
.drain_worker(node_id, immediate)
.await
.map_err(InnerError::from)
})
}
#[repr(C)]
pub struct BlazenAssignmentHandlerVTable {
pub user_data: *mut c_void,
pub drop_user_data: extern "C" fn(user_data: *mut c_void),
pub handle: extern "C" fn(
user_data: *mut c_void,
run_id: *const c_char,
workflow_name: *const c_char,
input_json: *const c_char,
out_json: *mut *mut c_char,
out_err: *mut *mut BlazenError,
) -> i32,
pub on_cancel: extern "C" fn(user_data: *mut c_void, run_id: *const c_char),
pub on_drain: extern "C" fn(user_data: *mut c_void, immediate: bool),
}
unsafe impl Send for BlazenAssignmentHandlerVTable {}
unsafe impl Sync for BlazenAssignmentHandlerVTable {}
struct CAssignmentHandler {
vtable: BlazenAssignmentHandlerVTable,
}
impl Drop for CAssignmentHandler {
fn drop(&mut self) {
(self.vtable.drop_user_data)(self.vtable.user_data);
}
}
#[async_trait]
impl AssignmentHandler for CAssignmentHandler {
async fn handle(
&self,
assignment: Assignment,
_ctx: AssignmentContext,
) -> Result<serde_json::Value, AssignmentFailure> {
let run_id = match CString::new(assignment.run_id.to_string()) {
Ok(c) => c,
Err(e) => {
return Err(AssignmentFailure::new(format!(
"run_id has interior NUL: {e}"
)));
}
};
let workflow_name = match CString::new(assignment.workflow_name.clone()) {
Ok(c) => c,
Err(e) => {
return Err(AssignmentFailure::new(format!(
"workflow_name has interior NUL: {e}"
)));
}
};
let input_str = match std::str::from_utf8(&assignment.input_json) {
Ok(s) => s.to_owned(),
Err(e) => {
return Err(AssignmentFailure::new(format!(
"input_json not valid UTF-8: {e}"
)));
}
};
let input_json = match CString::new(input_str) {
Ok(c) => c,
Err(e) => {
return Err(AssignmentFailure::new(format!(
"input_json has interior NUL: {e}"
)));
}
};
let user_data_addr = self.vtable.user_data as usize;
let handle_fn = self.vtable.handle;
let join = tokio::task::spawn_blocking(move || -> (i32, usize, usize) {
let user_data = user_data_addr as *mut c_void;
let mut out_json: *mut c_char = std::ptr::null_mut();
let mut out_err: *mut BlazenError = std::ptr::null_mut();
let rc = handle_fn(
user_data,
run_id.as_ptr(),
workflow_name.as_ptr(),
input_json.as_ptr(),
&raw mut out_json,
&raw mut out_err,
);
(rc, out_json as usize, out_err as usize)
})
.await;
let (rc, out_json, out_err) = match join {
Ok((rc, j, e)) => (rc, j as *mut c_char, e as *mut BlazenError),
Err(e) => {
return Err(AssignmentFailure::new(format!(
"assignment handler task panicked: {e}"
)));
}
};
if rc != 0 {
let msg = if out_err.is_null() {
"assignment handler returned non-zero without setting out_err".to_string()
} else {
let owned = unsafe { Box::from_raw(out_err) };
owned.inner.to_string()
};
return Err(AssignmentFailure::new(msg));
}
if out_json.is_null() {
return Ok(serde_json::Value::Null);
}
let owned = unsafe { CString::from_raw(out_json) };
let s = match owned.into_string() {
Ok(s) => s,
Err(e) => {
return Err(AssignmentFailure::new(format!(
"assignment handler returned non-UTF-8 output: {e}"
)));
}
};
serde_json::from_str(&s).map_err(|e| {
AssignmentFailure::new(format!("assignment handler returned non-JSON output: {e}"))
})
}
async fn on_cancel(&self, run_id: Uuid) {
let cstr = match CString::new(run_id.to_string()) {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, %run_id, "on_cancel: run_id has interior NUL");
return;
}
};
let user_data_addr = self.vtable.user_data as usize;
let on_cancel_fn = self.vtable.on_cancel;
let _ = tokio::task::spawn_blocking(move || {
let user_data = user_data_addr as *mut c_void;
on_cancel_fn(user_data, cstr.as_ptr());
})
.await;
}
async fn on_drain(&self, immediate: bool) {
let user_data_addr = self.vtable.user_data as usize;
let on_drain_fn = self.vtable.on_drain;
let _ = tokio::task::spawn_blocking(move || {
let user_data = user_data_addr as *mut c_void;
on_drain_fn(user_data, immediate);
})
.await;
}
}
pub struct BlazenControlPlaneWorker {
inner: Arc<Mutex<Option<InnerHolder>>>,
}
#[allow(clippy::too_many_arguments)]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_worker_new_blocking(
endpoint: *const c_char,
node_id: *const c_char,
capabilities_json: *const c_char,
tags_json: *const c_char,
admission_mode: u32,
admission_param: u64,
vtable: BlazenAssignmentHandlerVTable,
out_worker: *mut *mut BlazenControlPlaneWorker,
out_err: *mut *mut BlazenError,
) -> i32 {
let release_vtable = || {
(vtable.drop_user_data)(vtable.user_data);
};
if endpoint.is_null() || node_id.is_null() {
release_vtable();
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let Some(endpoint) = (unsafe { cstr_to_str(endpoint) }).map(str::to_owned) else {
release_vtable();
return unsafe { write_internal_error(out_err, "endpoint not valid UTF-8") };
};
let Some(node_id) = (unsafe { cstr_to_str(node_id) }).map(str::to_owned) else {
release_vtable();
return unsafe { write_internal_error(out_err, "node_id not valid UTF-8") };
};
let Some(capabilities) = (unsafe { parse_json_capabilities(capabilities_json) }) else {
release_vtable();
return unsafe {
write_internal_error(
out_err,
"capabilities_json must be a JSON array of {kind, version}",
)
};
};
let Some(tags) = (unsafe { parse_tags_json(tags_json) }) else {
release_vtable();
return unsafe {
write_internal_error(out_err, "tags_json must be a JSON object of string->string")
};
};
let admission = build_admission(admission_mode, admission_param);
let mut config = InnerWorkerConfig::new(endpoint, node_id);
for cap in capabilities {
config = config.with_capability(cap);
}
for (k, v) in tags {
config = config.with_tag(k, v);
}
config = config.with_admission(admission);
let worker_inner = match InnerWorker::connect(config) {
Ok(w) => w,
Err(e) => {
release_vtable();
return unsafe { write_error(out_err, InnerError::from(e)) };
}
};
let handler = Arc::new(CAssignmentHandler { vtable });
let worker = BlazenControlPlaneWorker {
inner: Arc::new(Mutex::new(Some(InnerHolder {
worker: worker_inner,
handler,
}))),
};
if !out_worker.is_null() {
unsafe {
*out_worker = Box::into_raw(Box::new(worker));
}
}
0
}
struct InnerHolder {
worker: InnerWorker,
handler: Arc<CAssignmentHandler>,
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_worker_run_blocking(
worker: *const BlazenControlPlaneWorker,
out_err: *mut *mut BlazenError,
) -> i32 {
if worker.is_null() {
return unsafe { write_internal_error(out_err, "null worker pointer") };
}
let worker = unsafe { &*worker };
let taken = worker.inner.lock().take();
let Some(holder) = taken else {
return unsafe {
write_internal_error(out_err, "worker.run already called; create a fresh worker")
};
};
let InnerHolder { worker: w, handler } = holder;
match runtime().block_on(async move { w.run(HandlerArc(handler)).await }) {
Ok(()) => 0,
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_worker_run(
worker: *const BlazenControlPlaneWorker,
) -> *mut BlazenFuture {
if worker.is_null() {
return std::ptr::null_mut();
}
let worker = unsafe { &*worker };
let Some(holder) = worker.inner.lock().take() else {
return std::ptr::null_mut();
};
let InnerHolder { worker: w, handler } = holder;
BlazenFuture::spawn(async move { w.run(HandlerArc(handler)).await.map_err(InnerError::from) })
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_worker_shutdown(
worker: *const BlazenControlPlaneWorker,
) {
if worker.is_null() {
return;
}
let worker = unsafe { &*worker };
let guard = worker.inner.lock();
if let Some(holder) = guard.as_ref() {
holder.worker.shutdown();
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_worker_free(worker: *mut BlazenControlPlaneWorker) {
if worker.is_null() {
return;
}
drop(unsafe { Box::from_raw(worker) });
}
struct HandlerArc(Arc<CAssignmentHandler>);
#[async_trait]
impl AssignmentHandler for HandlerArc {
async fn handle(
&self,
assignment: Assignment,
ctx: AssignmentContext,
) -> Result<serde_json::Value, AssignmentFailure> {
self.0.handle(assignment, ctx).await
}
async fn on_cancel(&self, run_id: Uuid) {
self.0.on_cancel(run_id).await;
}
async fn on_drain(&self, immediate: bool) {
self.0.on_drain(immediate).await;
}
}
unsafe fn parse_tags_json(ptr: *const c_char) -> Option<Vec<(String, String)>> {
if ptr.is_null() {
return Some(Vec::new());
}
let s = unsafe { cstr_to_str(ptr) }?;
let trimmed = s.trim();
if trimmed.is_empty() {
return Some(Vec::new());
}
let value: serde_json::Value = serde_json::from_str(s).ok()?;
let obj = match value {
serde_json::Value::Object(obj) => obj,
serde_json::Value::Null => return Some(Vec::new()),
_ => return None,
};
let mut out = Vec::with_capacity(obj.len());
for (k, v) in obj {
let s = v.as_str()?.to_owned();
out.push((k, s));
}
Some(out)
}
fn build_admission(mode: u32, param: u64) -> AdmissionMode {
match mode {
1 => AdmissionMode::VramBudget { max_vram_mb: param },
2 => AdmissionMode::Reactive,
_ => {
let max_in_flight = if param == 0 {
1
} else {
u32::try_from(param).unwrap_or(u32::MAX)
};
AdmissionMode::Fixed { max_in_flight }
}
}
}
unsafe fn parse_mtls_paths(
cert_path: *const c_char,
key_path: *const c_char,
ca_path: *const c_char,
out_err: *mut *mut BlazenError,
) -> Option<(PathBuf, PathBuf, PathBuf)> {
if cert_path.is_null() || key_path.is_null() || ca_path.is_null() {
unsafe { write_internal_error(out_err, "null pointer argument") };
return None;
}
let Some(cert) = (unsafe { cstr_to_str(cert_path) }).map(PathBuf::from) else {
unsafe { write_internal_error(out_err, "cert_path not valid UTF-8") };
return None;
};
let Some(key) = (unsafe { cstr_to_str(key_path) }).map(PathBuf::from) else {
unsafe { write_internal_error(out_err, "key_path not valid UTF-8") };
return None;
};
let Some(ca) = (unsafe { cstr_to_str(ca_path) }).map(PathBuf::from) else {
unsafe { write_internal_error(out_err, "ca_path not valid UTF-8") };
return None;
};
Some((cert, key, ca))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_connect_with_mtls_blocking(
endpoint: *const c_char,
cert_path: *const c_char,
key_path: *const c_char,
ca_path: *const c_char,
out_client: *mut *mut BlazenControlPlaneClient,
out_err: *mut *mut BlazenError,
) -> i32 {
if endpoint.is_null() {
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let endpoint = match unsafe { cstr_to_str(endpoint) } {
Some(s) => s.to_owned(),
None => return unsafe { write_internal_error(out_err, "endpoint not valid UTF-8") },
};
let Some((cert, key, ca)) =
(unsafe { parse_mtls_paths(cert_path, key_path, ca_path, out_err) })
else {
return -1;
};
match runtime()
.block_on(async move { InnerClient::with_mtls(endpoint, &cert, &key, &ca).await })
{
Ok(client) => {
if !out_client.is_null() {
unsafe {
*out_client =
Box::into_raw(Box::new(BlazenControlPlaneClient { inner: client }));
}
}
0
}
Err(e) => unsafe { write_error(out_err, InnerError::from(e)) },
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_connect_with_mtls(
endpoint: *const c_char,
cert_path: *const c_char,
key_path: *const c_char,
ca_path: *const c_char,
) -> *mut BlazenFuture {
if endpoint.is_null() || cert_path.is_null() || key_path.is_null() || ca_path.is_null() {
return std::ptr::null_mut();
}
let Some(endpoint) = (unsafe { cstr_to_str(endpoint) }).map(str::to_owned) else {
return std::ptr::null_mut();
};
let Some(cert) = (unsafe { cstr_to_str(cert_path) }).map(PathBuf::from) else {
return std::ptr::null_mut();
};
let Some(key) = (unsafe { cstr_to_str(key_path) }).map(PathBuf::from) else {
return std::ptr::null_mut();
};
let Some(ca) = (unsafe { cstr_to_str(ca_path) }).map(PathBuf::from) else {
return std::ptr::null_mut();
};
BlazenFuture::spawn(async move {
InnerClient::with_mtls(endpoint, &cert, &key, &ca)
.await
.map(|client| BlazenControlPlaneClient { inner: client })
.map_err(InnerError::from)
})
}
#[allow(clippy::too_many_arguments)]
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_worker_new_with_mtls_blocking(
endpoint: *const c_char,
node_id: *const c_char,
capabilities_json: *const c_char,
tags_json: *const c_char,
admission_mode: u32,
admission_param: u64,
cert_path: *const c_char,
key_path: *const c_char,
ca_path: *const c_char,
vtable: BlazenAssignmentHandlerVTable,
out_worker: *mut *mut BlazenControlPlaneWorker,
out_err: *mut *mut BlazenError,
) -> i32 {
let release_vtable = || {
(vtable.drop_user_data)(vtable.user_data);
};
if endpoint.is_null() || node_id.is_null() {
release_vtable();
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let Some(endpoint) = (unsafe { cstr_to_str(endpoint) }).map(str::to_owned) else {
release_vtable();
return unsafe { write_internal_error(out_err, "endpoint not valid UTF-8") };
};
let Some(node_id) = (unsafe { cstr_to_str(node_id) }).map(str::to_owned) else {
release_vtable();
return unsafe { write_internal_error(out_err, "node_id not valid UTF-8") };
};
let Some(capabilities) = (unsafe { parse_json_capabilities(capabilities_json) }) else {
release_vtable();
return unsafe {
write_internal_error(
out_err,
"capabilities_json must be a JSON array of {kind, version}",
)
};
};
let Some(tags) = (unsafe { parse_tags_json(tags_json) }) else {
release_vtable();
return unsafe {
write_internal_error(out_err, "tags_json must be a JSON object of string->string")
};
};
let Some((cert, key, ca)) =
(unsafe { parse_mtls_paths(cert_path, key_path, ca_path, out_err) })
else {
release_vtable();
return -1;
};
let admission = build_admission(admission_mode, admission_param);
let mut config = InnerWorkerConfig::new(endpoint, node_id);
for cap in capabilities {
config = config.with_capability(cap);
}
for (k, v) in tags {
config = config.with_tag(k, v);
}
config = config.with_admission(admission);
config = match config.with_mtls(&cert, &key, &ca) {
Ok(c) => c,
Err(e) => {
release_vtable();
return unsafe { write_error(out_err, InnerError::from(e)) };
}
};
let worker_inner = match InnerWorker::connect(config) {
Ok(w) => w,
Err(e) => {
release_vtable();
return unsafe { write_error(out_err, InnerError::from(e)) };
}
};
let handler = Arc::new(CAssignmentHandler { vtable });
let worker = BlazenControlPlaneWorker {
inner: Arc::new(Mutex::new(Some(InnerHolder {
worker: worker_inner,
handler,
}))),
};
if !out_worker.is_null() {
unsafe {
*out_worker = Box::into_raw(Box::new(worker));
}
}
0
}
#[repr(C)]
pub struct BlazenRunEventSinkVTable {
pub user_data: *mut c_void,
pub drop_user_data: unsafe extern "C" fn(user_data: *mut c_void),
pub on_event: unsafe extern "C" fn(
user_data: *mut c_void,
run_id: *const c_char,
event_type: *const c_char,
data_json: *const c_char,
timestamp_ms: u64,
),
pub on_close: unsafe extern "C" fn(user_data: *mut c_void),
pub on_error: unsafe extern "C" fn(user_data: *mut c_void, error: *const c_char),
}
unsafe impl Send for BlazenRunEventSinkVTable {}
unsafe impl Sync for BlazenRunEventSinkVTable {}
pub struct BlazenControlPlaneSubscription {
cancel: CancellationToken,
}
struct CRunEventSink {
vtable: BlazenRunEventSinkVTable,
}
impl Drop for CRunEventSink {
fn drop(&mut self) {
unsafe {
(self.vtable.drop_user_data)(self.vtable.user_data);
}
}
}
enum SubscribeKind {
PerRun(Uuid),
All(Vec<String>),
}
fn spawn_event_pump(
sink: CRunEventSink,
client: InnerClient,
kind: SubscribeKind,
cancel: CancellationToken,
ready_tx: tokio::sync::oneshot::Sender<Result<(), InnerError>>,
) -> tokio::task::JoinHandle<()> {
runtime().spawn(async move {
run_event_pump_inner(sink, client, kind, cancel, ready_tx).await;
})
}
async fn run_event_pump_inner(
sink: CRunEventSink,
client: InnerClient,
kind: SubscribeKind,
cancel: CancellationToken,
ready_tx: tokio::sync::oneshot::Sender<Result<(), InnerError>>,
) {
let sink = Arc::new(sink);
let outcome = {
let mut stream = match open_stream(&client, kind).await {
Ok(s) => {
let _ = ready_tx.send(Ok(()));
s
}
Err(e) => {
let _ = ready_tx.send(Err(e));
return;
}
};
let mut result: Result<(), String> = Ok(());
loop {
tokio::select! {
biased;
() = cancel.cancelled() => break,
next = stream.next() => match next {
None => break,
Some(Ok(event)) => {
dispatch_event(Arc::clone(&sink), event).await;
}
Some(Err(e)) => {
result = Err(e.to_string());
break;
}
},
}
}
result
};
drop(client);
if cancel.is_cancelled() {
return;
}
match outcome {
Ok(()) => {
fire_on_close(Arc::clone(&sink)).await;
}
Err(msg) => {
fire_on_error(Arc::clone(&sink), msg).await;
}
}
}
async fn open_stream(
client: &InnerClient,
kind: SubscribeKind,
) -> Result<blazen_core::distributed::RunEventStream<'_>, InnerError> {
match kind {
SubscribeKind::PerRun(uuid) => {
client
.subscribe_run_events(uuid)
.await
.map_err(|e| InnerError::Internal {
message: format!("subscribe_run_events failed: {e}"),
})
}
SubscribeKind::All(tags) => {
client
.subscribe_all(tags)
.await
.map_err(|e| InnerError::Internal {
message: format!("subscribe_all failed: {e}"),
})
}
}
}
async fn dispatch_event(sink: Arc<CRunEventSink>, event: blazen_core::distributed::RunEvent) {
let run_id = match CString::new(event.run_id.to_string()) {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "subscription on_event: run_id has interior NUL");
return;
}
};
let event_type = match CString::new(event.event_type.clone()) {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "subscription on_event: event_type has interior NUL");
return;
}
};
let data_str = match serde_json::to_string(&event.data) {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "subscription on_event: data JSON encode failed");
return;
}
};
let data_json = match CString::new(data_str) {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "subscription on_event: data_json has interior NUL");
return;
}
};
let timestamp_ms = event.timestamp_ms;
let user_data_addr = sink.vtable.user_data as usize;
let on_event_fn = sink.vtable.on_event;
let _ = tokio::task::spawn_blocking(move || {
let user_data = user_data_addr as *mut c_void;
unsafe {
on_event_fn(
user_data,
run_id.as_ptr(),
event_type.as_ptr(),
data_json.as_ptr(),
timestamp_ms,
);
}
})
.await;
}
async fn fire_on_close(sink: Arc<CRunEventSink>) {
let user_data_addr = sink.vtable.user_data as usize;
let on_close_fn = sink.vtable.on_close;
let _ = tokio::task::spawn_blocking(move || {
let user_data = user_data_addr as *mut c_void;
unsafe {
on_close_fn(user_data);
}
})
.await;
}
async fn fire_on_error(sink: Arc<CRunEventSink>, msg: String) {
let cstr = match CString::new(msg.clone()) {
Ok(c) => c,
Err(_) => {
CString::new(msg.replace('\0', "?"))
.unwrap_or_else(|_| CString::new("<unrenderable error>").unwrap_or_default())
}
};
let user_data_addr = sink.vtable.user_data as usize;
let on_error_fn = sink.vtable.on_error;
let _ = tokio::task::spawn_blocking(move || {
let user_data = user_data_addr as *mut c_void;
unsafe {
on_error_fn(user_data, cstr.as_ptr());
}
})
.await;
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_subscribe_run_events(
client: *const BlazenControlPlaneClient,
run_id: *const c_char,
sink: BlazenRunEventSinkVTable,
out_sub: *mut *mut BlazenControlPlaneSubscription,
out_err: *mut *mut BlazenError,
) -> i32 {
let release_vtable = || {
unsafe {
(sink.drop_user_data)(sink.user_data);
}
};
if client.is_null() || run_id.is_null() {
release_vtable();
return unsafe { write_internal_error(out_err, "null pointer argument") };
}
let client_ref = unsafe { &*client };
let Some(uuid) = (unsafe { parse_uuid(run_id) }) else {
release_vtable();
return unsafe { write_internal_error(out_err, "run_id is not a valid UUID") };
};
let csink = CRunEventSink { vtable: sink };
let cancel = CancellationToken::new();
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
let inner = client_ref.inner.clone();
spawn_event_pump(
csink,
inner,
SubscribeKind::PerRun(uuid),
cancel.clone(),
ready_tx,
);
match runtime().block_on(ready_rx) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
cancel.cancel();
return unsafe { write_error(out_err, e) };
}
Err(_) => {
cancel.cancel();
return unsafe {
write_internal_error(out_err, "subscription pump task dropped before signalling")
};
}
}
if out_sub.is_null() {
cancel.cancel();
} else {
let sub = BlazenControlPlaneSubscription { cancel };
unsafe {
*out_sub = Box::into_raw(Box::new(sub));
}
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_client_subscribe_all(
client: *const BlazenControlPlaneClient,
required_tags_json: *const c_char,
sink: BlazenRunEventSinkVTable,
out_sub: *mut *mut BlazenControlPlaneSubscription,
out_err: *mut *mut BlazenError,
) -> i32 {
let release_vtable = || {
unsafe {
(sink.drop_user_data)(sink.user_data);
}
};
if client.is_null() {
release_vtable();
return unsafe { write_internal_error(out_err, "null client pointer") };
}
let client_ref = unsafe { &*client };
let Some(required_tags) = (unsafe { parse_json_string_array(required_tags_json) }) else {
release_vtable();
return unsafe {
write_internal_error(out_err, "required_tags_json is not a JSON string array")
};
};
let csink = CRunEventSink { vtable: sink };
let cancel = CancellationToken::new();
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
let inner = client_ref.inner.clone();
spawn_event_pump(
csink,
inner,
SubscribeKind::All(required_tags),
cancel.clone(),
ready_tx,
);
match runtime().block_on(ready_rx) {
Ok(Ok(())) => {}
Ok(Err(e)) => {
cancel.cancel();
return unsafe { write_error(out_err, e) };
}
Err(_) => {
cancel.cancel();
return unsafe {
write_internal_error(out_err, "subscription pump task dropped before signalling")
};
}
}
if out_sub.is_null() {
cancel.cancel();
} else {
let sub = BlazenControlPlaneSubscription { cancel };
unsafe {
*out_sub = Box::into_raw(Box::new(sub));
}
}
0
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_subscription_cancel(
sub: *mut BlazenControlPlaneSubscription,
) {
if sub.is_null() {
return;
}
let sub = unsafe { &*sub };
sub.cancel.cancel();
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_controlplane_subscription_free(
sub: *mut BlazenControlPlaneSubscription,
) {
if sub.is_null() {
return;
}
let owned = unsafe { Box::from_raw(sub) };
owned.cancel.cancel();
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_future_take_controlplane_client(
fut: *mut BlazenFuture,
out: *mut *mut BlazenControlPlaneClient,
err: *mut *mut BlazenError,
) -> i32 {
match unsafe { BlazenFuture::take_typed::<BlazenControlPlaneClient>(fut) } {
Ok(client) => {
if !out.is_null() {
unsafe {
*out = Box::into_raw(Box::new(client));
}
}
0
}
Err(e) => {
if !err.is_null() {
unsafe {
*err = BlazenError::from(e).into_ptr();
}
}
-1
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_future_take_run_state_snapshot(
fut: *mut BlazenFuture,
out: *mut *mut BlazenRunStateSnapshot,
err: *mut *mut BlazenError,
) -> i32 {
match unsafe { BlazenFuture::take_typed::<blazen_core::distributed::RunStateSnapshot>(fut) } {
Ok(snap) => {
if !out.is_null() {
unsafe {
*out = BlazenRunStateSnapshot::from(snap).into_ptr();
}
}
0
}
Err(e) => {
if !err.is_null() {
unsafe {
*err = BlazenError::from(e).into_ptr();
}
}
-1
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn blazen_future_take_worker_info_list(
fut: *mut BlazenFuture,
out: *mut *mut BlazenWorkerInfoList,
err: *mut *mut BlazenError,
) -> i32 {
match unsafe { BlazenFuture::take_typed::<Vec<blazen_core::distributed::WorkerInfo>>(fut) } {
Ok(workers) => {
if !out.is_null() {
unsafe {
*out = BlazenWorkerInfoList::from(workers).into_ptr();
}
}
0
}
Err(e) => {
if !err.is_null() {
unsafe {
*err = BlazenError::from(e).into_ptr();
}
}
-1
}
}
}