venus 0.1.1

Reactive notebook environment for Rust
Documentation
//! Venus worker process for isolated cell execution.
//!
//! This binary receives commands via stdin and sends responses via stdout.
//! It can be killed at any time for immediate cell interruption.

use std::io::{BufReader, BufWriter, stdin, stdout};
use std::panic::{self, AssertUnwindSafe};
use std::path::PathBuf;

use libloading::{Library, Symbol};

use venus_core::ipc::{WorkerCommand, WorkerResponse, read_message, write_message};

mod ffi;

use ffi::{EntryFn0, ExecutionResult};

/// Currently loaded cell.
struct LoadedCell {
    /// Path to the dylib (for error messages).
    #[allow(dead_code)]
    path: PathBuf,
    /// The loaded library.
    library: Library,
    /// Number of dependencies.
    dep_count: usize,
    /// Entry symbol name.
    entry_symbol: String,
    /// Cell name for error messages.
    name: String,
}

fn main() {
    // Initialize tracing for debugging (stderr so it doesn't interfere with IPC)
    tracing_subscriber::fmt()
        .with_writer(std::io::stderr)
        .with_env_filter(
            tracing_subscriber::EnvFilter::from_default_env()
                .add_directive("venus_worker=info".parse().unwrap()),
        )
        .init();

    tracing::info!("Venus worker starting (pid={})", std::process::id());

    let stdin = stdin();
    let stdout = stdout();
    let mut reader = BufReader::new(stdin.lock());
    let mut writer = BufWriter::new(stdout.lock());

    let mut loaded_cell: Option<LoadedCell> = None;

    loop {
        // Read command from parent
        let cmd: WorkerCommand = match read_message(&mut reader) {
            Ok(cmd) => cmd,
            Err(e) => {
                tracing::error!("Failed to read command: {}", e);
                break;
            }
        };

        let response = match cmd {
            WorkerCommand::Ping => WorkerResponse::Pong,

            WorkerCommand::Shutdown => {
                tracing::info!("Shutdown requested");
                let _ = write_message(&mut writer, &WorkerResponse::ShuttingDown);
                break;
            }

            WorkerCommand::LoadCell {
                dylib_path,
                dep_count,
                entry_symbol,
                name,
            } => {
                tracing::info!("Loading cell '{}' from {:?}", name, dylib_path);
                match load_cell(
                    dylib_path.clone().into(),
                    dep_count,
                    entry_symbol.clone(),
                    name.clone(),
                ) {
                    Ok(cell) => {
                        loaded_cell = Some(cell);
                        WorkerResponse::Loaded
                    }
                    Err(e) => WorkerResponse::Error {
                        message: e.to_string(),
                    },
                }
            }

            WorkerCommand::Execute {
                inputs,
                widget_values_json,
            } => match &loaded_cell {
                None => WorkerResponse::Error {
                    message: "No cell loaded".to_string(),
                },
                Some(cell) => {
                    tracing::info!(
                        "Executing cell '{}' with {} inputs",
                        cell.name,
                        inputs.len()
                    );
                    execute_cell(cell, inputs, widget_values_json)
                }
            },
        };

        // Send response
        if let Err(e) = write_message(&mut writer, &response) {
            tracing::error!("Failed to send response: {}", e);
            break;
        }
    }

    tracing::info!("Venus worker shutting down");
}

/// Load a cell's dynamic library.
fn load_cell(
    path: PathBuf,
    dep_count: usize,
    entry_symbol: String,
    name: String,
) -> anyhow::Result<LoadedCell> {
    // Safety: We trust the compiled cell was generated by our compiler
    let library = unsafe { Library::new(&path) }
        .map_err(|e| anyhow::anyhow!("Failed to load library: {}", e))?;

    // Verify the entry symbol exists
    let symbol_name = format!("{}\0", entry_symbol);
    let _: Symbol<EntryFn0> = unsafe { library.get(symbol_name.as_bytes()) }
        .map_err(|e| anyhow::anyhow!("Entry symbol not found: {}", e))?;

    Ok(LoadedCell {
        path,
        library,
        dep_count,
        entry_symbol,
        name,
    })
}

/// Execute a cell with the given inputs.
fn execute_cell(
    cell: &LoadedCell,
    inputs: Vec<Vec<u8>>,
    widget_values_json: Vec<u8>,
) -> WorkerResponse {
    // Verify input count
    if inputs.len() != cell.dep_count {
        return WorkerResponse::Error {
            message: format!(
                "Cell {} expects {} inputs, got {}",
                cell.name,
                cell.dep_count,
                inputs.len()
            ),
        };
    }

    // Catch panics during execution
    // Note: Widget context is now set up inside the cell itself (via FFI parameter)
    let result = panic::catch_unwind(AssertUnwindSafe(|| {
        call_cell_ffi(cell, &inputs, &widget_values_json)
    }));

    match result {
        Ok(Ok((output_bytes, widgets_json))) => WorkerResponse::Output {
            bytes: output_bytes,
            widgets_json,
        },
        Ok(Err(e)) => WorkerResponse::Error { message: e },
        Err(panic_info) => {
            let message = if let Some(s) = panic_info.downcast_ref::<&str>() {
                s.to_string()
            } else if let Some(s) = panic_info.downcast_ref::<String>() {
                s.clone()
            } else {
                "Unknown panic".to_string()
            };
            WorkerResponse::Panic { message }
        }
    }
}

/// Call the cell's FFI entry point.
fn call_cell_ffi(
    cell: &LoadedCell,
    inputs: &[Vec<u8>],
    widget_values_json: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), String> {
    let symbol_name = format!("{}\0", cell.entry_symbol);

    // For cells with no dependencies
    if cell.dep_count == 0 {
        return call_cell_no_deps(cell, &symbol_name, widget_values_json);
    }

    // For cells with dependencies
    call_cell_with_deps(cell, &symbol_name, inputs, widget_values_json)
}

/// Call a cell with no dependencies.
fn call_cell_no_deps(
    cell: &LoadedCell,
    symbol_name: &str,
    widget_values_json: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), String> {
    let func: Symbol<EntryFn0> = unsafe { cell.library.get(symbol_name.as_bytes()) }
        .map_err(|e| format!("Failed to get symbol: {}", e))?;

    let mut out_ptr: *mut u8 = std::ptr::null_mut();
    let mut out_len: usize = 0;

    let result_code = unsafe {
        func(
            widget_values_json.as_ptr(),
            widget_values_json.len(),
            &mut out_ptr,
            &mut out_len,
        )
    };

    process_ffi_result(result_code, out_ptr, out_len, &cell.name)
}

/// Call a cell with dependencies.
fn call_cell_with_deps(
    cell: &LoadedCell,
    symbol_name: &str,
    inputs: &[Vec<u8>],
    widget_values_json: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), String> {
    // Dynamic dispatch based on dependency count
    match inputs.len() {
        1 => ffi::call_cell_1_deps(cell, symbol_name, inputs, widget_values_json),
        2 => ffi::call_cell_2_deps(cell, symbol_name, inputs, widget_values_json),
        3 => ffi::call_cell_3_deps(cell, symbol_name, inputs, widget_values_json),
        4 => ffi::call_cell_4_deps(cell, symbol_name, inputs, widget_values_json),
        5 => ffi::call_cell_5_deps(cell, symbol_name, inputs, widget_values_json),
        6 => ffi::call_cell_6_deps(cell, symbol_name, inputs, widget_values_json),
        7 => ffi::call_cell_7_deps(cell, symbol_name, inputs, widget_values_json),
        8 => ffi::call_cell_8_deps(cell, symbol_name, inputs, widget_values_json),
        n => Err(format!(
            "Cells with {} dependencies not yet supported (max 8)",
            n
        )),
    }
}

/// Process the FFI result and extract output bytes and widgets.
///
/// The cell output format is:
/// `display_len (8 bytes LE) | display_bytes | widgets_len (8 bytes LE) | widgets_json | rkyv_data`
///
/// Returns (bytes, widgets_json) where:
/// - bytes = `display_len | display_bytes | rkyv_data` (for backward compat)
/// - widgets_json = the widgets JSON bytes
fn process_ffi_result(
    result_code: i32,
    out_ptr: *mut u8,
    out_len: usize,
    cell_name: &str,
) -> Result<(Vec<u8>, Vec<u8>), String> {
    let result = ExecutionResult::from(result_code);

    match result {
        ExecutionResult::Success => {
            if out_ptr.is_null() || out_len == 0 {
                return Err(format!("Cell {} returned null output", cell_name));
            }

            // Take ownership of the output bytes
            let raw_bytes = unsafe {
                let slice = std::slice::from_raw_parts(out_ptr, out_len);
                let vec = slice.to_vec();
                // Free the memory allocated by the cell
                libc::free(out_ptr as *mut libc::c_void);
                vec
            };

            // Parse the output format:
            // display_len (8) | display_bytes | widgets_len (8) | widgets_json | rkyv_data
            if raw_bytes.len() < 16 {
                return Err(format!(
                    "Cell {} output too short: {} bytes",
                    cell_name,
                    raw_bytes.len()
                ));
            }

            // Read display_len
            let display_len_bytes: [u8; 8] = raw_bytes[0..8].try_into().map_err(|_| {
                format!("Cell {} output has malformed display_len field", cell_name)
            })?;
            let display_len = u64::from_le_bytes(display_len_bytes) as usize;
            let display_end = 8 + display_len;

            if raw_bytes.len() < display_end + 8 {
                return Err(format!(
                    "Cell {} output too short for display data",
                    cell_name
                ));
            }

            // Read widgets_len
            let widgets_len_bytes: [u8; 8] = raw_bytes[display_end..display_end + 8]
                .try_into()
                .map_err(|_| {
                    format!("Cell {} output has malformed widgets_len field", cell_name)
                })?;
            let widgets_len = u64::from_le_bytes(widgets_len_bytes) as usize;
            let widgets_start = display_end + 8;
            let widgets_end = widgets_start + widgets_len;

            if raw_bytes.len() < widgets_end {
                return Err(format!(
                    "Cell {} output too short for widgets data",
                    cell_name
                ));
            }

            // Extract widgets_json
            let widgets_json = raw_bytes[widgets_start..widgets_end].to_vec();

            // Extract rkyv_data (everything after widgets)
            let rkyv_data = &raw_bytes[widgets_end..];

            // Reconstruct bytes without widgets: display_len | display_bytes | rkyv_data
            let mut bytes = Vec::with_capacity(8 + display_len + rkyv_data.len());
            bytes.extend_from_slice(&raw_bytes[0..display_end]); // display_len + display_bytes
            bytes.extend_from_slice(rkyv_data);

            Ok((bytes, widgets_json))
        }
        ExecutionResult::DeserializationError => {
            Err(format!("Cell {} failed to deserialize input", cell_name))
        }
        ExecutionResult::CellError => Err(format!("Cell {} returned an error", cell_name)),
        ExecutionResult::SerializationError => {
            Err(format!("Cell {} failed to serialize output", cell_name))
        }
        ExecutionResult::Panic => Err(format!("Cell {} panicked during execution", cell_name)),
    }
}