use std::io::{BufRead, BufReader, Write};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use pylon_http::DataStore;
use crate::protocol::*;
use crate::trace::{TraceBuilder, TraceLog};
pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
pub type StreamCallback = Box<dyn FnMut(&str) + Send>;
pub type ScheduleHook = Box<
dyn Fn(&str, serde_json::Value, Option<u64>, Option<u64>) -> Result<String, String>
+ Send
+ Sync,
>;
pub type NestedCallHook = Box<
dyn Fn(&str, FnType, serde_json::Value, AuthInfo) -> Result<serde_json::Value, (String, String)>
+ Send
+ Sync,
>;
pub struct FnRunner {
process: Mutex<Option<Child>>,
stdin: Mutex<Option<std::process::ChildStdin>>,
inbox: Mutex<Option<Receiver<TsMessage>>>,
io_lock: Mutex<()>,
call_counter: AtomicU64,
pub trace_log: TraceLog,
schedule_hook: Mutex<Option<ScheduleHook>>,
nested_call_hook: Mutex<Option<NestedCallHook>>,
call_timeout: Mutex<Duration>,
started_with: Mutex<Option<(String, Vec<String>)>>,
}
impl FnRunner {
pub fn new(trace_capacity: usize) -> Self {
Self {
process: Mutex::new(None),
stdin: Mutex::new(None),
inbox: Mutex::new(None),
io_lock: Mutex::new(()),
call_counter: AtomicU64::new(0),
trace_log: TraceLog::new(trace_capacity),
schedule_hook: Mutex::new(None),
nested_call_hook: Mutex::new(None),
call_timeout: Mutex::new(DEFAULT_CALL_TIMEOUT),
started_with: Mutex::new(None),
}
}
pub fn set_call_timeout(&self, timeout: Duration) {
*self.call_timeout.lock().unwrap() = timeout;
}
pub fn set_schedule_hook(&self, hook: ScheduleHook) {
*self.schedule_hook.lock().unwrap() = Some(hook);
}
pub fn set_nested_call_hook(&self, hook: NestedCallHook) {
*self.nested_call_hook.lock().unwrap() = Some(hook);
}
pub fn start(
&self,
command: &str,
args: &[&str],
) -> Result<Vec<crate::registry::FnDef>, String> {
let mut child = Command::new(command)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.map_err(|e| format!("Failed to start function runner: {e}"))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| kill_and_msg(&mut child, "Failed to capture stdin".to_string()))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| kill_and_msg(&mut child, "Failed to capture stdout".to_string()))?;
let (tx, rx): (Sender<TsMessage>, Receiver<TsMessage>) = mpsc::channel();
std::thread::Builder::new()
.name("pylon-fn-reader".into())
.spawn(move || reader_loop(BufReader::new(stdout), tx))
.map_err(|e| kill_and_msg(&mut child, format!("Failed to spawn reader thread: {e}")))?;
let ready_msg = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(m) => m,
Err(_) => {
let _ = child.kill();
let _ = child.wait();
return Err("handshake timeout: TS runtime did not send Ready within 10s".into());
}
};
let defs = match ready_msg {
TsMessage::Ready(r) => {
if let Some(err) = r.error {
let _ = child.kill();
let _ = child.wait();
return Err(format!("Runtime startup error: {err}"));
}
r.functions
}
other => {
let _ = child.kill();
let _ = child.wait();
return Err(format!("expected Ready handshake, got {other:?}"));
}
};
*self.stdin.lock().unwrap() = Some(stdin);
*self.inbox.lock().unwrap() = Some(rx);
*self.process.lock().unwrap() = Some(child);
*self.started_with.lock().unwrap() = Some((
command.to_string(),
args.iter().map(|s| s.to_string()).collect(),
));
Ok(defs)
}
pub fn is_running(&self) -> bool {
self.process.lock().unwrap().is_some()
}
pub fn is_alive(&self) -> bool {
let mut guard = self.process.lock().unwrap();
match guard.as_mut() {
None => false,
Some(child) => match child.try_wait() {
Ok(Some(_status)) => false, Ok(None) => true, Err(_) => false, },
}
}
pub fn respawn(&self) -> Result<Vec<crate::registry::FnDef>, String> {
let started = self
.started_with
.lock()
.unwrap()
.clone()
.ok_or_else(|| "Cannot respawn: runner was never started".to_string())?;
self.kill();
let arg_refs: Vec<&str> = started.1.iter().map(|s| s.as_str()).collect();
self.start(&started.0, &arg_refs)
}
pub fn kill(&self) {
if let Some(mut child) = self.process.lock().unwrap().take() {
let _ = child.kill();
let _ = child.wait();
}
*self.stdin.lock().unwrap() = None;
*self.inbox.lock().unwrap() = None;
}
pub fn handshake(&self) -> Result<Vec<crate::registry::FnDef>, String> {
Err("handshake is now performed inside start(); use the return value".to_string())
}
pub fn call(
&self,
store: &dyn DataStore,
fn_name: &str,
fn_type: FnType,
args: serde_json::Value,
auth: AuthInfo,
on_stream: Option<StreamCallback>,
request: Option<crate::protocol::RequestInfo>,
) -> Result<(serde_json::Value, crate::trace::FnTrace), FnCallError> {
let _io = self.io_lock.lock().unwrap();
self.call_inner(store, fn_name, fn_type, args, auth, on_stream, request)
}
pub fn call_inner(
&self,
store: &dyn DataStore,
fn_name: &str,
fn_type: FnType,
args: serde_json::Value,
auth: AuthInfo,
mut on_stream: Option<StreamCallback>,
request: Option<crate::protocol::RequestInfo>,
) -> Result<(serde_json::Value, crate::trace::FnTrace), FnCallError> {
let timeout = *self.call_timeout.lock().unwrap();
let deadline = Instant::now() + timeout;
let call_id = format!("c_{}", self.call_counter.fetch_add(1, Ordering::Relaxed));
let mut trace = TraceBuilder::new_with_tenant(
call_id.clone(),
fn_name.to_string(),
fn_type,
auth.user_id.clone(),
auth.tenant_id.clone(),
);
let mut call_msg =
CallMessage::new(call_id.clone(), fn_name.to_string(), fn_type, args, auth);
if let Some(r) = request {
call_msg = call_msg.with_request(r);
}
self.send(&call_msg)?;
loop {
let msg = match self.recv(deadline) {
Ok(m) => m,
Err(e) if e.code == "FN_TIMEOUT" => {
tracing::warn!(
"[functions] Killing TS runtime: call \"{}\" exceeded {:?}",
fn_name,
timeout
);
self.kill();
let fn_trace = trace.finish_error(
"FN_TIMEOUT".into(),
format!("Function \"{fn_name}\" exceeded timeout {timeout:?}"),
);
self.trace_log.push(fn_trace);
return Err(e);
}
Err(e) => return Err(e),
};
match msg {
TsMessage::Db(db_msg) if db_msg.call_id == call_id => {
let op_start = Instant::now();
let (result, row_count) = execute_db_op(store, &db_msg);
let duration = op_start.elapsed();
let ok = result.is_ok();
trace.record_op(
db_msg.op,
&db_msg.entity,
db_msg.id.as_deref(),
duration,
row_count,
ok,
);
let reply = match result {
Ok(data) => {
DbResultMessage::ok_with_op(call_id.clone(), db_msg.op_id.clone(), data)
}
Err(e) => DbResultMessage::err_with_op(
call_id.clone(),
db_msg.op_id.clone(),
&e.code,
&e.message,
),
};
self.send(&reply)?;
}
TsMessage::Stream(chunk) if chunk.call_id == call_id => {
trace.record_stream_chunk(chunk.data.len());
if let Some(ref mut cb) = on_stream {
cb(&chunk.data);
}
}
TsMessage::Schedule(sched) if sched.call_id == call_id => {
trace.record_schedule(&sched.fn_name, sched.delay_ms, sched.run_at);
let hook_result: Result<String, String> = {
let hook = self.schedule_hook.lock().unwrap();
match *hook {
Some(ref cb) => cb(
&sched.fn_name,
sched.args.clone(),
sched.delay_ms,
sched.run_at,
),
None => Err("no schedule hook installed".into()),
}
};
let reply = match hook_result {
Ok(id) => DbResultMessage::ok(
call_id.clone(),
serde_json::json!({"scheduled": true, "id": id}),
),
Err(e) => DbResultMessage::err(call_id.clone(), "SCHEDULE_FAILED", &e),
};
self.send(&reply)?;
}
TsMessage::CancelSchedule(cancel) if cancel.call_id == call_id => {
let reply = DbResultMessage::ok(
call_id.clone(),
serde_json::json!({"cancelled": true}),
);
self.send(&reply)?;
}
TsMessage::RunFn(run) if run.call_id == call_id => {
let nested_auth = AuthInfo {
user_id: trace.user_id().map(|s| s.to_string()),
is_admin: false,
tenant_id: trace.tenant_id().map(|s| s.to_string()),
};
let hook_result: Option<Result<serde_json::Value, (String, String)>> = {
let hook = self.nested_call_hook.lock().unwrap();
hook.as_ref().map(|cb| {
cb(
&run.fn_name,
run.fn_type,
run.args.clone(),
nested_auth.clone(),
)
})
};
let reply = match hook_result {
Some(Ok(value)) => DbResultMessage::ok(call_id.clone(), value),
Some(Err((code, msg))) => {
DbResultMessage::err(call_id.clone(), &code, &msg)
}
None => {
match self.call_inner(
store,
&run.fn_name,
run.fn_type,
run.args,
nested_auth,
None,
None,
) {
Ok((value, _nested_trace)) => {
DbResultMessage::ok(call_id.clone(), value)
}
Err(e) => DbResultMessage::err(
call_id.clone(),
"FN_CALL_FAILED",
&e.message,
),
}
}
};
self.send(&reply)?;
}
TsMessage::Return(ret) if ret.call_id == call_id => {
let fn_trace = trace.finish_ok(Some(ret.value.clone()));
self.trace_log.push(fn_trace.clone());
return Ok((ret.value, fn_trace));
}
TsMessage::Error(err) if err.call_id == call_id => {
let fn_trace = trace.finish_error(err.code.clone(), err.message.clone());
self.trace_log.push(fn_trace.clone());
return Err(FnCallError {
code: err.code,
message: err.message,
});
}
_ => {}
}
}
}
fn send<T: serde::Serialize>(&self, msg: &T) -> Result<(), FnCallError> {
let mut stdin_guard = self.stdin.lock().unwrap();
let stdin = stdin_guard.as_mut().ok_or_else(|| FnCallError {
code: "RUNNER_NOT_STARTED".into(),
message: "TypeScript function runner is not running".into(),
})?;
let mut line = serde_json::to_string(msg).map_err(|e| FnCallError {
code: "SERIALIZE_FAILED".into(),
message: format!("Failed to serialize message: {e}"),
})?;
line.push('\n');
stdin.write_all(line.as_bytes()).map_err(|e| FnCallError {
code: "IO_ERROR".into(),
message: format!("Failed to write to runner: {e}"),
})?;
stdin.flush().map_err(|e| FnCallError {
code: "IO_ERROR".into(),
message: format!("Failed to flush runner stdin: {e}"),
})?;
Ok(())
}
fn recv(&self, deadline: Instant) -> Result<TsMessage, FnCallError> {
let inbox_guard = self.inbox.lock().unwrap();
let inbox = inbox_guard.as_ref().ok_or_else(|| FnCallError {
code: "RUNNER_NOT_STARTED".into(),
message: "TypeScript function runner is not running".into(),
})?;
let now = Instant::now();
let remaining = if deadline <= now {
Duration::ZERO
} else {
deadline - now
};
match inbox.recv_timeout(remaining) {
Ok(msg) => Ok(msg),
Err(RecvTimeoutError::Timeout) => Err(FnCallError {
code: "FN_TIMEOUT".into(),
message: "Function exceeded the configured call timeout".into(),
}),
Err(RecvTimeoutError::Disconnected) => Err(FnCallError {
code: "RUNNER_EXITED".into(),
message: "TypeScript function runner process exited unexpectedly".into(),
}),
}
}
}
fn kill_and_msg(child: &mut Child, msg: String) -> String {
let _ = child.kill();
let _ = child.wait();
msg
}
fn reader_loop(mut stdout: BufReader<std::process::ChildStdout>, tx: Sender<TsMessage>) {
let mut line = String::new();
loop {
line.clear();
match stdout.read_line(&mut line) {
Ok(0) => break, Err(_) => break, Ok(_) => {}
}
match serde_json::from_str::<TsMessage>(line.trim()) {
Ok(msg) => {
if tx.send(msg).is_err() {
break; }
}
Err(e) => {
tracing::warn!(
"[functions] Skipping unparseable line from Bun runtime: {e} (line={:?})",
line.trim()
);
}
}
}
}
impl Drop for FnRunner {
fn drop(&mut self) {
if let Some(mut child) = self.process.lock().unwrap().take() {
let _ = child.kill();
let _ = child.wait();
}
}
}
impl TraceBuilder {
pub fn user_id(&self) -> Option<&str> {
self.user_id.as_deref()
}
}
fn execute_db_op(
store: &dyn DataStore,
msg: &DbOpMessage,
) -> (
Result<serde_json::Value, pylon_http::DataError>,
Option<usize>,
) {
match msg.op {
DbOp::Get => {
let id = msg.id.as_deref().unwrap_or("");
match store.get_by_id(&msg.entity, id) {
Ok(Some(row)) => (Ok(row), Some(1)),
Ok(None) => (Ok(serde_json::Value::Null), Some(0)),
Err(e) => (Err(e), None),
}
}
DbOp::List => match store.list(&msg.entity) {
Ok(rows) => {
let count = rows.len();
(Ok(serde_json::json!(rows)), Some(count))
}
Err(e) => (Err(e), None),
},
DbOp::Paginate => {
let requested = msg.limit.unwrap_or(20).min(1000).max(1) as usize;
let after = msg.after.as_deref();
match store.list_after(&msg.entity, after, requested + 1) {
Ok(mut rows) => {
let is_done = rows.len() <= requested;
if !is_done {
rows.truncate(requested);
}
let next_cursor = if is_done {
None
} else {
rows.last()
.and_then(|r| r.get("id"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
};
let count = rows.len();
(
Ok(serde_json::json!({
"page": rows,
"nextCursor": next_cursor,
"isDone": is_done,
})),
Some(count),
)
}
Err(e) => (Err(e), None),
}
}
DbOp::Insert => {
let data = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
match store.insert(&msg.entity, &data) {
Ok(id) => (Ok(serde_json::json!({"id": id})), None),
Err(e) => (Err(e), None),
}
}
DbOp::Update => {
let id = msg.id.as_deref().unwrap_or("");
let data = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
match store.update(&msg.entity, id, &data) {
Ok(updated) => (Ok(serde_json::json!({"updated": updated})), None),
Err(e) => (Err(e), None),
}
}
DbOp::Delete => {
let id = msg.id.as_deref().unwrap_or("");
match store.delete(&msg.entity, id) {
Ok(deleted) => (Ok(serde_json::json!({"deleted": deleted})), None),
Err(e) => (Err(e), None),
}
}
DbOp::Lookup => {
let field = msg.field.as_deref().unwrap_or("");
let value = msg.value.as_deref().unwrap_or("");
match store.lookup(&msg.entity, field, value) {
Ok(Some(row)) => (Ok(row), Some(1)),
Ok(None) => (Ok(serde_json::Value::Null), Some(0)),
Err(e) => (Err(e), None),
}
}
DbOp::Query => {
let filter = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
match store.query_filtered(&msg.entity, &filter) {
Ok(rows) => {
let count = rows.len();
(Ok(serde_json::json!(rows)), Some(count))
}
Err(e) => (Err(e), None),
}
}
DbOp::QueryGraph => {
let query = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
match store.query_graph(&query) {
Ok(result) => (Ok(result), None),
Err(e) => (Err(e), None),
}
}
DbOp::Link => {
let id = msg.id.as_deref().unwrap_or("");
let relation = msg.relation.as_deref().unwrap_or("");
let target_id = msg.target_id.as_deref().unwrap_or("");
match store.link(&msg.entity, id, relation, target_id) {
Ok(linked) => (Ok(serde_json::json!({"linked": linked})), None),
Err(e) => (Err(e), None),
}
}
DbOp::Unlink => {
let id = msg.id.as_deref().unwrap_or("");
let relation = msg.relation.as_deref().unwrap_or("");
match store.unlink(&msg.entity, id, relation) {
Ok(unlinked) => (Ok(serde_json::json!({"unlinked": unlinked})), None),
Err(e) => (Err(e), None),
}
}
DbOp::Search => {
let query = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
match store.search(&msg.entity, &query) {
Ok(result) => {
let count = result
.get("hits")
.and_then(|v| v.as_array())
.map(|a| a.len());
(Ok(result), count)
}
Err(e) => (Err(e), None),
}
}
}
}
#[derive(Debug, Clone)]
pub struct FnCallError {
pub code: String,
pub message: String,
}
impl std::fmt::Display for FnCallError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}", self.code, self.message)
}
}
impl std::error::Error for FnCallError {}
impl From<pylon_http::DataError> for FnCallError {
fn from(e: pylon_http::DataError) -> Self {
FnCallError {
code: e.code,
message: e.message,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fn_call_error_display() {
let e = FnCallError {
code: "TEST".into(),
message: "fail".into(),
};
assert_eq!(format!("{e}"), "[TEST] fail");
}
}