use std::io::{BufReader, BufWriter, stdin, stdout};
use std::panic::{self, AssertUnwindSafe};
use std::path::PathBuf;
use libloading::{Library, Symbol};
use venus_core::ipc::{WorkerCommand, WorkerResponse, read_message, write_message};
mod ffi;
use ffi::{EntryFn0, ExecutionResult};
struct LoadedCell {
#[allow(dead_code)]
path: PathBuf,
library: Library,
dep_count: usize,
entry_symbol: String,
name: String,
}
fn main() {
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("venus_worker=info".parse().unwrap()),
)
.init();
tracing::info!("Venus worker starting (pid={})", std::process::id());
let stdin = stdin();
let stdout = stdout();
let mut reader = BufReader::new(stdin.lock());
let mut writer = BufWriter::new(stdout.lock());
let mut loaded_cell: Option<LoadedCell> = None;
loop {
let cmd: WorkerCommand = match read_message(&mut reader) {
Ok(cmd) => cmd,
Err(e) => {
tracing::error!("Failed to read command: {}", e);
break;
}
};
let response = match cmd {
WorkerCommand::Ping => WorkerResponse::Pong,
WorkerCommand::Shutdown => {
tracing::info!("Shutdown requested");
let _ = write_message(&mut writer, &WorkerResponse::ShuttingDown);
break;
}
WorkerCommand::LoadCell {
dylib_path,
dep_count,
entry_symbol,
name,
} => {
tracing::info!("Loading cell '{}' from {:?}", name, dylib_path);
match load_cell(
dylib_path.clone().into(),
dep_count,
entry_symbol.clone(),
name.clone(),
) {
Ok(cell) => {
loaded_cell = Some(cell);
WorkerResponse::Loaded
}
Err(e) => WorkerResponse::Error {
message: e.to_string(),
},
}
}
WorkerCommand::Execute {
inputs,
widget_values_json,
} => match &loaded_cell {
None => WorkerResponse::Error {
message: "No cell loaded".to_string(),
},
Some(cell) => {
tracing::info!(
"Executing cell '{}' with {} inputs",
cell.name,
inputs.len()
);
execute_cell(cell, inputs, widget_values_json)
}
},
};
if let Err(e) = write_message(&mut writer, &response) {
tracing::error!("Failed to send response: {}", e);
break;
}
}
tracing::info!("Venus worker shutting down");
}
fn load_cell(
path: PathBuf,
dep_count: usize,
entry_symbol: String,
name: String,
) -> anyhow::Result<LoadedCell> {
let library = unsafe { Library::new(&path) }
.map_err(|e| anyhow::anyhow!("Failed to load library: {}", e))?;
let symbol_name = format!("{}\0", entry_symbol);
let _: Symbol<EntryFn0> = unsafe { library.get(symbol_name.as_bytes()) }
.map_err(|e| anyhow::anyhow!("Entry symbol not found: {}", e))?;
Ok(LoadedCell {
path,
library,
dep_count,
entry_symbol,
name,
})
}
fn execute_cell(
cell: &LoadedCell,
inputs: Vec<Vec<u8>>,
widget_values_json: Vec<u8>,
) -> WorkerResponse {
if inputs.len() != cell.dep_count {
return WorkerResponse::Error {
message: format!(
"Cell {} expects {} inputs, got {}",
cell.name,
cell.dep_count,
inputs.len()
),
};
}
let result = panic::catch_unwind(AssertUnwindSafe(|| {
call_cell_ffi(cell, &inputs, &widget_values_json)
}));
match result {
Ok(Ok((output_bytes, widgets_json))) => WorkerResponse::Output {
bytes: output_bytes,
widgets_json,
},
Ok(Err(e)) => WorkerResponse::Error { message: e },
Err(panic_info) => {
let message = if let Some(s) = panic_info.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic".to_string()
};
WorkerResponse::Panic { message }
}
}
}
fn call_cell_ffi(
cell: &LoadedCell,
inputs: &[Vec<u8>],
widget_values_json: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), String> {
let symbol_name = format!("{}\0", cell.entry_symbol);
if cell.dep_count == 0 {
return call_cell_no_deps(cell, &symbol_name, widget_values_json);
}
call_cell_with_deps(cell, &symbol_name, inputs, widget_values_json)
}
fn call_cell_no_deps(
cell: &LoadedCell,
symbol_name: &str,
widget_values_json: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), String> {
let func: Symbol<EntryFn0> = unsafe { cell.library.get(symbol_name.as_bytes()) }
.map_err(|e| format!("Failed to get symbol: {}", e))?;
let mut out_ptr: *mut u8 = std::ptr::null_mut();
let mut out_len: usize = 0;
let result_code = unsafe {
func(
widget_values_json.as_ptr(),
widget_values_json.len(),
&mut out_ptr,
&mut out_len,
)
};
process_ffi_result(result_code, out_ptr, out_len, &cell.name)
}
fn call_cell_with_deps(
cell: &LoadedCell,
symbol_name: &str,
inputs: &[Vec<u8>],
widget_values_json: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), String> {
match inputs.len() {
1 => ffi::call_cell_1_deps(cell, symbol_name, inputs, widget_values_json),
2 => ffi::call_cell_2_deps(cell, symbol_name, inputs, widget_values_json),
3 => ffi::call_cell_3_deps(cell, symbol_name, inputs, widget_values_json),
4 => ffi::call_cell_4_deps(cell, symbol_name, inputs, widget_values_json),
5 => ffi::call_cell_5_deps(cell, symbol_name, inputs, widget_values_json),
6 => ffi::call_cell_6_deps(cell, symbol_name, inputs, widget_values_json),
7 => ffi::call_cell_7_deps(cell, symbol_name, inputs, widget_values_json),
8 => ffi::call_cell_8_deps(cell, symbol_name, inputs, widget_values_json),
n => Err(format!(
"Cells with {} dependencies not yet supported (max 8)",
n
)),
}
}
fn process_ffi_result(
result_code: i32,
out_ptr: *mut u8,
out_len: usize,
cell_name: &str,
) -> Result<(Vec<u8>, Vec<u8>), String> {
let result = ExecutionResult::from(result_code);
match result {
ExecutionResult::Success => {
if out_ptr.is_null() || out_len == 0 {
return Err(format!("Cell {} returned null output", cell_name));
}
let raw_bytes = unsafe {
let slice = std::slice::from_raw_parts(out_ptr, out_len);
let vec = slice.to_vec();
libc::free(out_ptr as *mut libc::c_void);
vec
};
if raw_bytes.len() < 16 {
return Err(format!(
"Cell {} output too short: {} bytes",
cell_name,
raw_bytes.len()
));
}
let display_len_bytes: [u8; 8] = raw_bytes[0..8].try_into().map_err(|_| {
format!("Cell {} output has malformed display_len field", cell_name)
})?;
let display_len = u64::from_le_bytes(display_len_bytes) as usize;
let display_end = 8 + display_len;
if raw_bytes.len() < display_end + 8 {
return Err(format!(
"Cell {} output too short for display data",
cell_name
));
}
let widgets_len_bytes: [u8; 8] = raw_bytes[display_end..display_end + 8]
.try_into()
.map_err(|_| {
format!("Cell {} output has malformed widgets_len field", cell_name)
})?;
let widgets_len = u64::from_le_bytes(widgets_len_bytes) as usize;
let widgets_start = display_end + 8;
let widgets_end = widgets_start + widgets_len;
if raw_bytes.len() < widgets_end {
return Err(format!(
"Cell {} output too short for widgets data",
cell_name
));
}
let widgets_json = raw_bytes[widgets_start..widgets_end].to_vec();
let rkyv_data = &raw_bytes[widgets_end..];
let mut bytes = Vec::with_capacity(8 + display_len + rkyv_data.len());
bytes.extend_from_slice(&raw_bytes[0..display_end]); bytes.extend_from_slice(rkyv_data);
Ok((bytes, widgets_json))
}
ExecutionResult::DeserializationError => {
Err(format!("Cell {} failed to deserialize input", cell_name))
}
ExecutionResult::CellError => Err(format!("Cell {} returned an error", cell_name)),
ExecutionResult::SerializationError => {
Err(format!("Cell {} failed to serialize output", cell_name))
}
ExecutionResult::Panic => Err(format!("Cell {} panicked during execution", cell_name)),
}
}