pylon_functions/runner.rs
1//! Function runner — executes TypeScript functions via the bidirectional protocol.
2//!
3//! The runner manages the connection to the Bun/Deno process and mediates
4//! all communication. It handles DB operations, stream forwarding, scheduling,
5//! and transaction management.
6
7use std::io::{BufRead, BufReader, Write};
8use std::process::{Child, Command, Stdio};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
11use std::sync::Mutex;
12use std::time::{Duration, Instant};
13
14use pylon_http::DataStore;
15
16use crate::protocol::*;
17use crate::trace::{TraceBuilder, TraceLog};
18
19/// Default ceiling on how long a single function call may take. Holds the
20/// SQLite write lock for mutations, so this is also a backstop against a
21/// runaway TS handler blocking the whole DB. Override via
22/// [`FnRunner::set_call_timeout`] or `PYLON_FN_CALL_TIMEOUT` (server-side).
23pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
24
25// ---------------------------------------------------------------------------
26// Stream callback — receives SSE chunks during execution
27// ---------------------------------------------------------------------------
28
29/// Callback invoked for each stream chunk during function execution.
30/// The server layer converts these into SSE events on the HTTP response.
31pub type StreamCallback = Box<dyn FnMut(&str) + Send>;
32
33/// Callback invoked when a function calls `ctx.scheduler.runAfter/runAt`.
34/// Returns `Ok(job_id)` on success or `Err(msg)` on persistence/queue
35/// failure. The runner reports the error back to the calling handler so
36/// users don't get a silent `{scheduled: true, id: ""}`.
37pub type ScheduleHook = Box<
38 dyn Fn(&str, serde_json::Value, Option<u64>, Option<u64>) -> Result<String, String>
39 + Send
40 + Sync,
41>;
42
43/// Callback invoked when a running function asks to run *another* function
44/// (action → query/mutation). The wrapper is responsible for any per-type
45/// setup — notably wrapping mutations in their own BEGIN/COMMIT, which
46/// can't happen inside `call_inner` because that path holds the io_lock
47/// and is called with the outer action's non-transactional store.
48///
49/// Returns the nested function's return value or a `FnCallError`-shaped
50/// `(code, message)` pair. The runner translates the error back into the
51/// NDJSON protocol reply so the TS side sees the same shape it always did.
52pub type NestedCallHook = Box<
53 dyn Fn(&str, FnType, serde_json::Value, AuthInfo) -> Result<serde_json::Value, (String, String)>
54 + Send
55 + Sync,
56>;
57
58// ---------------------------------------------------------------------------
59// Function runner
60// ---------------------------------------------------------------------------
61
62/// Manages the TypeScript process and executes function calls.
63pub struct FnRunner {
64 process: Mutex<Option<Child>>,
65 /// Stdin half — guarded so concurrent senders don't interleave bytes.
66 stdin: Mutex<Option<std::process::ChildStdin>>,
67 /// Channel of parsed messages from the reader thread. Single consumer
68 /// (callers serialize via `io_lock`), so no per-call demuxing.
69 inbox: Mutex<Option<Receiver<TsMessage>>>,
70 /// Held for the duration of a call to keep request/response in order.
71 /// Also serializes the underlying single Bun process.
72 io_lock: Mutex<()>,
73 call_counter: AtomicU64,
74 pub trace_log: TraceLog,
75 schedule_hook: Mutex<Option<ScheduleHook>>,
76 /// Optional override for nested function calls (action → query/mutation).
77 /// When set, the runner delegates `RunFn` messages to this hook so the
78 /// caller can wrap mutations in their own transaction. When absent, we
79 /// fall back to the old recursive path (no transaction for nested
80 /// mutations — documented limitation).
81 nested_call_hook: Mutex<Option<NestedCallHook>>,
82 /// Timeout for `recv()` between protocol messages. A handler that doesn't
83 /// reply within this window is treated as stuck.
84 call_timeout: Mutex<Duration>,
85 /// The command and args that started the runtime. Stored so the supervisor
86 /// can respawn on crash without the caller re-passing them.
87 started_with: Mutex<Option<(String, Vec<String>)>>,
88}
89
90impl FnRunner {
91 /// Create a new runner with the given trace log capacity.
92 pub fn new(trace_capacity: usize) -> Self {
93 Self {
94 process: Mutex::new(None),
95 stdin: Mutex::new(None),
96 inbox: Mutex::new(None),
97 io_lock: Mutex::new(()),
98 call_counter: AtomicU64::new(0),
99 trace_log: TraceLog::new(trace_capacity),
100 schedule_hook: Mutex::new(None),
101 nested_call_hook: Mutex::new(None),
102 call_timeout: Mutex::new(DEFAULT_CALL_TIMEOUT),
103 started_with: Mutex::new(None),
104 }
105 }
106
107 /// Override the per-call timeout. The default is 30s.
108 pub fn set_call_timeout(&self, timeout: Duration) {
109 *self.call_timeout.lock().unwrap() = timeout;
110 }
111
112 /// Install a callback to handle `ctx.scheduler` requests from functions.
113 pub fn set_schedule_hook(&self, hook: ScheduleHook) {
114 *self.schedule_hook.lock().unwrap() = Some(hook);
115 }
116
117 /// Install a callback used for nested function calls (action → query or
118 /// mutation). The callback is responsible for transactional wrapping when
119 /// the nested fn is a mutation. Without this hook, nested mutations share
120 /// the outer action's non-transactional store and writes aren't atomic.
121 pub fn set_nested_call_hook(&self, hook: NestedCallHook) {
122 *self.nested_call_hook.lock().unwrap() = Some(hook);
123 }
124
125 /// Start the TypeScript process and complete the startup handshake.
126 ///
127 /// Spawns the child + reader thread, waits for the runtime's `Ready`
128 /// message, and only then publishes stdin/inbox/process so callers can
129 /// see the runner. This avoids the race where a concurrent `call()`
130 /// would consume the `Ready` message and desync the protocol.
131 ///
132 /// On any failure (spawn, missing pipes, bad handshake, runtime-reported
133 /// error) the child is killed before returning so a half-alive process
134 /// doesn't survive — important for the supervisor, which uses
135 /// `is_alive()` and would otherwise see "still running" forever.
136 ///
137 /// Returns the function definitions reported by the runtime.
138 pub fn start(
139 &self,
140 command: &str,
141 args: &[&str],
142 ) -> Result<Vec<crate::registry::FnDef>, String> {
143 let mut child = Command::new(command)
144 .args(args)
145 .stdin(Stdio::piped())
146 .stdout(Stdio::piped())
147 .stderr(Stdio::inherit())
148 .spawn()
149 .map_err(|e| format!("Failed to start function runner: {e}"))?;
150
151 let stdin = child
152 .stdin
153 .take()
154 .ok_or_else(|| kill_and_msg(&mut child, "Failed to capture stdin".to_string()))?;
155 let stdout = child
156 .stdout
157 .take()
158 .ok_or_else(|| kill_and_msg(&mut child, "Failed to capture stdout".to_string()))?;
159
160 let (tx, rx): (Sender<TsMessage>, Receiver<TsMessage>) = mpsc::channel();
161 std::thread::Builder::new()
162 .name("pylon-fn-reader".into())
163 .spawn(move || reader_loop(BufReader::new(stdout), tx))
164 .map_err(|e| kill_and_msg(&mut child, format!("Failed to spawn reader thread: {e}")))?;
165
166 // Read Ready BEFORE publishing the new IO. If we published first, a
167 // concurrent caller could send a request and `recv()` would eat the
168 // Ready in the catch-all match arm, leaving us in protocol limbo.
169 let ready_msg = match rx.recv_timeout(Duration::from_secs(10)) {
170 Ok(m) => m,
171 Err(_) => {
172 let _ = child.kill();
173 let _ = child.wait();
174 return Err("handshake timeout: TS runtime did not send Ready within 10s".into());
175 }
176 };
177 let defs = match ready_msg {
178 TsMessage::Ready(r) => {
179 if let Some(err) = r.error {
180 let _ = child.kill();
181 let _ = child.wait();
182 return Err(format!("Runtime startup error: {err}"));
183 }
184 r.functions
185 }
186 other => {
187 let _ = child.kill();
188 let _ = child.wait();
189 return Err(format!("expected Ready handshake, got {other:?}"));
190 }
191 };
192
193 // Handshake succeeded — publish.
194 *self.stdin.lock().unwrap() = Some(stdin);
195 *self.inbox.lock().unwrap() = Some(rx);
196 *self.process.lock().unwrap() = Some(child);
197 *self.started_with.lock().unwrap() = Some((
198 command.to_string(),
199 args.iter().map(|s| s.to_string()).collect(),
200 ));
201
202 Ok(defs)
203 }
204
205 /// Check if the TypeScript process is running.
206 pub fn is_running(&self) -> bool {
207 self.process.lock().unwrap().is_some()
208 }
209
210 /// Returns true if the child process is alive. Distinct from `is_running`
211 /// which only checks that we ever started one — supervisor uses this.
212 pub fn is_alive(&self) -> bool {
213 let mut guard = self.process.lock().unwrap();
214 match guard.as_mut() {
215 None => false,
216 Some(child) => match child.try_wait() {
217 Ok(Some(_status)) => false, // exited
218 Ok(None) => true, // still running
219 Err(_) => false, // can't tell — assume dead
220 },
221 }
222 }
223
224 /// Restart the underlying process using the command/args from the original
225 /// `start()` call. The supervisor uses this; callers should not need it.
226 /// Returns the freshly-handshaked function definitions. On any failure
227 /// the new child has already been killed by `start()`.
228 pub fn respawn(&self) -> Result<Vec<crate::registry::FnDef>, String> {
229 let started = self
230 .started_with
231 .lock()
232 .unwrap()
233 .clone()
234 .ok_or_else(|| "Cannot respawn: runner was never started".to_string())?;
235 // Drop the dead child + IO before spawning a new one.
236 self.kill();
237 let arg_refs: Vec<&str> = started.1.iter().map(|s| s.as_str()).collect();
238 self.start(&started.0, &arg_refs)
239 }
240
241 /// Forcefully kill the child process. Used by the supervisor on timeout
242 /// or when the runtime is shutting down. The reader thread will exit
243 /// cleanly when its stdout closes.
244 pub fn kill(&self) {
245 if let Some(mut child) = self.process.lock().unwrap().take() {
246 let _ = child.kill();
247 let _ = child.wait();
248 }
249 // Drop stdin so the reader thread sees EOF and exits.
250 *self.stdin.lock().unwrap() = None;
251 *self.inbox.lock().unwrap() = None;
252 }
253
254 /// Backwards-compatible: `start()` now performs the handshake itself
255 /// and returns the function definitions. `handshake()` is a no-op shim
256 /// that returns whatever the runtime is currently registered to.
257 /// Kept so existing callers (`try_spawn_functions`) compile without churn.
258 pub fn handshake(&self) -> Result<Vec<crate::registry::FnDef>, String> {
259 Err("handshake is now performed inside start(); use the return value".to_string())
260 }
261
262 /// Execute a function call against the TypeScript process.
263 ///
264 /// For mutations: the caller must hold the write lock and pass a transaction-capable store.
265 /// For queries: uses the read pool, no locking required.
266 /// For actions: no direct DB access, calls run_fn for nested queries/mutations.
267 ///
268 /// Returns `(return_value, trace)`. Stream chunks are delivered via the callback.
269 pub fn call(
270 &self,
271 store: &dyn DataStore,
272 fn_name: &str,
273 fn_type: FnType,
274 args: serde_json::Value,
275 auth: AuthInfo,
276 on_stream: Option<StreamCallback>,
277 request: Option<crate::protocol::RequestInfo>,
278 ) -> Result<(serde_json::Value, crate::trace::FnTrace), FnCallError> {
279 // Serialize all top-level calls — one Bun process, NDJSON over stdio
280 // is not multiplexed at this layer. Nested calls (action → query)
281 // recurse through `call_inner` WITHOUT re-acquiring the lock.
282 // `std::sync::Mutex` is not re-entrant, so doing otherwise wedges.
283 let _io = self.io_lock.lock().unwrap();
284 self.call_inner(store, fn_name, fn_type, args, auth, on_stream, request)
285 }
286
287 /// Protocol-only call — assumes the caller already holds `io_lock`.
288 /// This is the body of a `call()` minus the lock. It is `pub` so the
289 /// nested-call hook in `FnOpsImpl` can re-enter the protocol for a
290 /// transactional mutation wrap without re-acquiring the mutex (which
291 /// would deadlock since `std::sync::Mutex` is not re-entrant).
292 ///
293 /// # Safety contract
294 /// Do not call directly from code that didn't acquire `io_lock` via a
295 /// prior `call()` invocation. Callers outside this crate should use
296 /// `call()`; the only external caller is the nested-call hook.
297 pub fn call_inner(
298 &self,
299 store: &dyn DataStore,
300 fn_name: &str,
301 fn_type: FnType,
302 args: serde_json::Value,
303 auth: AuthInfo,
304 mut on_stream: Option<StreamCallback>,
305 request: Option<crate::protocol::RequestInfo>,
306 ) -> Result<(serde_json::Value, crate::trace::FnTrace), FnCallError> {
307 let timeout = *self.call_timeout.lock().unwrap();
308 let deadline = Instant::now() + timeout;
309
310 let call_id = format!("c_{}", self.call_counter.fetch_add(1, Ordering::Relaxed));
311 let mut trace = TraceBuilder::new_with_tenant(
312 call_id.clone(),
313 fn_name.to_string(),
314 fn_type,
315 auth.user_id.clone(),
316 auth.tenant_id.clone(),
317 );
318
319 // Send the call message. Attach HTTP request metadata when the
320 // caller provided it — this lets TypeScript actions invoked via
321 // /api/webhooks/:name see raw headers + body for signature checks.
322 let mut call_msg =
323 CallMessage::new(call_id.clone(), fn_name.to_string(), fn_type, args, auth);
324 if let Some(r) = request {
325 call_msg = call_msg.with_request(r);
326 }
327 self.send(&call_msg)?;
328
329 // Process messages until we get a return or error.
330 loop {
331 let msg = match self.recv(deadline) {
332 Ok(m) => m,
333 Err(e) if e.code == "FN_TIMEOUT" => {
334 // The child is now in an unknown state — it owns the call
335 // mid-flight and may be holding open whatever resource it
336 // had. Kill it; the supervisor will respawn. Better to
337 // lose the runtime than to wedge the SQLite write lock.
338 tracing::warn!(
339 "[functions] Killing TS runtime: call \"{}\" exceeded {:?}",
340 fn_name,
341 timeout
342 );
343 self.kill();
344 let fn_trace = trace.finish_error(
345 "FN_TIMEOUT".into(),
346 format!("Function \"{fn_name}\" exceeded timeout {timeout:?}"),
347 );
348 self.trace_log.push(fn_trace);
349 return Err(e);
350 }
351 Err(e) => return Err(e),
352 };
353 match msg {
354 TsMessage::Db(db_msg) if db_msg.call_id == call_id => {
355 let op_start = Instant::now();
356 let (result, row_count) = execute_db_op(store, &db_msg);
357 let duration = op_start.elapsed();
358 let ok = result.is_ok();
359
360 trace.record_op(
361 db_msg.op,
362 &db_msg.entity,
363 db_msg.id.as_deref(),
364 duration,
365 row_count,
366 ok,
367 );
368
369 // Echo op_id from the request so the TS side can demux
370 // concurrent DB ops from a single handler. Old TS
371 // runtimes that don't send op_id get the same behavior
372 // as before (one in-flight at a time, serialized by
373 // pendingRpcs key collision).
374 let reply = match result {
375 Ok(data) => {
376 DbResultMessage::ok_with_op(call_id.clone(), db_msg.op_id.clone(), data)
377 }
378 Err(e) => DbResultMessage::err_with_op(
379 call_id.clone(),
380 db_msg.op_id.clone(),
381 &e.code,
382 &e.message,
383 ),
384 };
385 self.send(&reply)?;
386 }
387
388 TsMessage::Stream(chunk) if chunk.call_id == call_id => {
389 trace.record_stream_chunk(chunk.data.len());
390 if let Some(ref mut cb) = on_stream {
391 cb(&chunk.data);
392 }
393 }
394
395 TsMessage::Schedule(sched) if sched.call_id == call_id => {
396 trace.record_schedule(&sched.fn_name, sched.delay_ms, sched.run_at);
397 let hook_result: Result<String, String> = {
398 let hook = self.schedule_hook.lock().unwrap();
399 match *hook {
400 Some(ref cb) => cb(
401 &sched.fn_name,
402 sched.args.clone(),
403 sched.delay_ms,
404 sched.run_at,
405 ),
406 None => Err("no schedule hook installed".into()),
407 }
408 };
409 let reply = match hook_result {
410 Ok(id) => DbResultMessage::ok(
411 call_id.clone(),
412 serde_json::json!({"scheduled": true, "id": id}),
413 ),
414 Err(e) => DbResultMessage::err(call_id.clone(), "SCHEDULE_FAILED", &e),
415 };
416 self.send(&reply)?;
417 }
418
419 TsMessage::CancelSchedule(cancel) if cancel.call_id == call_id => {
420 let reply = DbResultMessage::ok(
421 call_id.clone(),
422 serde_json::json!({"cancelled": true}),
423 );
424 self.send(&reply)?;
425 }
426
427 TsMessage::RunFn(run) if run.call_id == call_id => {
428 // Nested function call (action calling query/mutation).
429 // Execute recursively. The nested call gets its own trace
430 // but inherits user + tenant from the caller so row-level
431 // policies (`auth.tenantId == data.orgId`) keep working
432 // when an action stamps tenant-scoped writes via helper
433 // mutations. Callers that need to cross tenant boundaries
434 // must do so on the client side — no silent elevation
435 // happens here; the caller's tenant carries through.
436 let nested_auth = AuthInfo {
437 user_id: trace.user_id().map(|s| s.to_string()),
438 is_admin: false,
439 tenant_id: trace.tenant_id().map(|s| s.to_string()),
440 };
441 // Prefer the nested_call_hook if installed — it lets the
442 // caller wrap mutations in their own BEGIN/COMMIT around
443 // a TxStore. Falling back to direct recursion leaves
444 // mutations non-transactional when triggered from an
445 // action (documented limitation).
446 let hook_result: Option<Result<serde_json::Value, (String, String)>> = {
447 let hook = self.nested_call_hook.lock().unwrap();
448 hook.as_ref().map(|cb| {
449 cb(
450 &run.fn_name,
451 run.fn_type,
452 run.args.clone(),
453 nested_auth.clone(),
454 )
455 })
456 };
457 let reply = match hook_result {
458 Some(Ok(value)) => DbResultMessage::ok(call_id.clone(), value),
459 Some(Err((code, msg))) => {
460 DbResultMessage::err(call_id.clone(), &code, &msg)
461 }
462 None => {
463 // No hook installed — fall back to direct recursion.
464 // Already inside io_lock, so use call_inner. Nested
465 // calls never get HTTP request metadata.
466 match self.call_inner(
467 store,
468 &run.fn_name,
469 run.fn_type,
470 run.args,
471 nested_auth,
472 None,
473 None,
474 ) {
475 Ok((value, _nested_trace)) => {
476 DbResultMessage::ok(call_id.clone(), value)
477 }
478 Err(e) => DbResultMessage::err(
479 call_id.clone(),
480 "FN_CALL_FAILED",
481 &e.message,
482 ),
483 }
484 }
485 };
486 self.send(&reply)?;
487 }
488
489 TsMessage::Return(ret) if ret.call_id == call_id => {
490 let fn_trace = trace.finish_ok(Some(ret.value.clone()));
491 self.trace_log.push(fn_trace.clone());
492 return Ok((ret.value, fn_trace));
493 }
494
495 TsMessage::Error(err) if err.call_id == call_id => {
496 let fn_trace = trace.finish_error(err.code.clone(), err.message.clone());
497 self.trace_log.push(fn_trace.clone());
498 return Err(FnCallError {
499 code: err.code,
500 message: err.message,
501 });
502 }
503
504 // Messages for a different call_id — shouldn't happen with
505 // sequential execution, but skip gracefully.
506 _ => {}
507 }
508 }
509 }
510
511 fn send<T: serde::Serialize>(&self, msg: &T) -> Result<(), FnCallError> {
512 let mut stdin_guard = self.stdin.lock().unwrap();
513 let stdin = stdin_guard.as_mut().ok_or_else(|| FnCallError {
514 code: "RUNNER_NOT_STARTED".into(),
515 message: "TypeScript function runner is not running".into(),
516 })?;
517
518 let mut line = serde_json::to_string(msg).map_err(|e| FnCallError {
519 code: "SERIALIZE_FAILED".into(),
520 message: format!("Failed to serialize message: {e}"),
521 })?;
522 line.push('\n');
523
524 stdin.write_all(line.as_bytes()).map_err(|e| FnCallError {
525 code: "IO_ERROR".into(),
526 message: format!("Failed to write to runner: {e}"),
527 })?;
528 stdin.flush().map_err(|e| FnCallError {
529 code: "IO_ERROR".into(),
530 message: format!("Failed to flush runner stdin: {e}"),
531 })?;
532
533 Ok(())
534 }
535
536 fn recv(&self, deadline: Instant) -> Result<TsMessage, FnCallError> {
537 let inbox_guard = self.inbox.lock().unwrap();
538 let inbox = inbox_guard.as_ref().ok_or_else(|| FnCallError {
539 code: "RUNNER_NOT_STARTED".into(),
540 message: "TypeScript function runner is not running".into(),
541 })?;
542
543 let now = Instant::now();
544 let remaining = if deadline <= now {
545 Duration::ZERO
546 } else {
547 deadline - now
548 };
549
550 match inbox.recv_timeout(remaining) {
551 Ok(msg) => Ok(msg),
552 Err(RecvTimeoutError::Timeout) => Err(FnCallError {
553 code: "FN_TIMEOUT".into(),
554 message: "Function exceeded the configured call timeout".into(),
555 }),
556 Err(RecvTimeoutError::Disconnected) => Err(FnCallError {
557 code: "RUNNER_EXITED".into(),
558 message: "TypeScript function runner process exited unexpectedly".into(),
559 }),
560 }
561 }
562}
563
564/// Kill a child and pass through an error message — used during start()
565/// when something goes wrong after spawn but before publishing the IO.
566/// Always wait() after kill() so the child is reaped — otherwise it
567/// hangs around as a zombie until the parent exits.
568fn kill_and_msg(child: &mut Child, msg: String) -> String {
569 let _ = child.kill();
570 let _ = child.wait();
571 msg
572}
573
574/// Background reader thread: parses NDJSON lines from the Bun stdout into
575/// TsMessage values and forwards them to the channel. Exits when stdout
576/// closes (child died or was killed).
577fn reader_loop(mut stdout: BufReader<std::process::ChildStdout>, tx: Sender<TsMessage>) {
578 let mut line = String::new();
579 loop {
580 line.clear();
581 match stdout.read_line(&mut line) {
582 Ok(0) => break, // EOF — child exited
583 Err(_) => break, // pipe error — child gone
584 Ok(_) => {}
585 }
586 match serde_json::from_str::<TsMessage>(line.trim()) {
587 Ok(msg) => {
588 if tx.send(msg).is_err() {
589 break; // Receiver dropped — runner shutting down
590 }
591 }
592 Err(e) => {
593 tracing::warn!(
594 "[functions] Skipping unparseable line from Bun runtime: {e} (line={:?})",
595 line.trim()
596 );
597 }
598 }
599 }
600}
601
602impl Drop for FnRunner {
603 fn drop(&mut self) {
604 if let Some(mut child) = self.process.lock().unwrap().take() {
605 let _ = child.kill();
606 let _ = child.wait();
607 }
608 }
609}
610
611// ---------------------------------------------------------------------------
612// TraceBuilder helper (access user_id during execution)
613// ---------------------------------------------------------------------------
614
615impl TraceBuilder {
616 pub fn user_id(&self) -> Option<&str> {
617 self.user_id.as_deref()
618 }
619}
620
621// ---------------------------------------------------------------------------
622// DB operation executor
623// ---------------------------------------------------------------------------
624
625/// Execute a DB operation from a TypeScript function against the DataStore.
626///
627/// Returns the result value and optional row count (for traces).
628fn execute_db_op(
629 store: &dyn DataStore,
630 msg: &DbOpMessage,
631) -> (
632 Result<serde_json::Value, pylon_http::DataError>,
633 Option<usize>,
634) {
635 match msg.op {
636 DbOp::Get => {
637 let id = msg.id.as_deref().unwrap_or("");
638 match store.get_by_id(&msg.entity, id) {
639 Ok(Some(row)) => (Ok(row), Some(1)),
640 Ok(None) => (Ok(serde_json::Value::Null), Some(0)),
641 Err(e) => (Err(e), None),
642 }
643 }
644 DbOp::List => match store.list(&msg.entity) {
645 Ok(rows) => {
646 let count = rows.len();
647 (Ok(serde_json::json!(rows)), Some(count))
648 }
649 Err(e) => (Err(e), None),
650 },
651 DbOp::Paginate => {
652 // Fetch limit+1 to detect "isDone" without an extra round trip,
653 // matching the router's /api/entities/:e/cursor endpoint.
654 let requested = msg.limit.unwrap_or(20).min(1000).max(1) as usize;
655 let after = msg.after.as_deref();
656 match store.list_after(&msg.entity, after, requested + 1) {
657 Ok(mut rows) => {
658 let is_done = rows.len() <= requested;
659 if !is_done {
660 rows.truncate(requested);
661 }
662 let next_cursor = if is_done {
663 None
664 } else {
665 rows.last()
666 .and_then(|r| r.get("id"))
667 .and_then(|v| v.as_str())
668 .map(|s| s.to_string())
669 };
670 let count = rows.len();
671 (
672 Ok(serde_json::json!({
673 "page": rows,
674 "nextCursor": next_cursor,
675 "isDone": is_done,
676 })),
677 Some(count),
678 )
679 }
680 Err(e) => (Err(e), None),
681 }
682 }
683 DbOp::Insert => {
684 let data = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
685 match store.insert(&msg.entity, &data) {
686 Ok(id) => (Ok(serde_json::json!({"id": id})), None),
687 Err(e) => (Err(e), None),
688 }
689 }
690 DbOp::Update => {
691 let id = msg.id.as_deref().unwrap_or("");
692 let data = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
693 match store.update(&msg.entity, id, &data) {
694 Ok(updated) => (Ok(serde_json::json!({"updated": updated})), None),
695 Err(e) => (Err(e), None),
696 }
697 }
698 DbOp::Delete => {
699 let id = msg.id.as_deref().unwrap_or("");
700 match store.delete(&msg.entity, id) {
701 Ok(deleted) => (Ok(serde_json::json!({"deleted": deleted})), None),
702 Err(e) => (Err(e), None),
703 }
704 }
705 DbOp::Lookup => {
706 let field = msg.field.as_deref().unwrap_or("");
707 let value = msg.value.as_deref().unwrap_or("");
708 match store.lookup(&msg.entity, field, value) {
709 Ok(Some(row)) => (Ok(row), Some(1)),
710 Ok(None) => (Ok(serde_json::Value::Null), Some(0)),
711 Err(e) => (Err(e), None),
712 }
713 }
714 DbOp::Query => {
715 let filter = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
716 match store.query_filtered(&msg.entity, &filter) {
717 Ok(rows) => {
718 let count = rows.len();
719 (Ok(serde_json::json!(rows)), Some(count))
720 }
721 Err(e) => (Err(e), None),
722 }
723 }
724 DbOp::QueryGraph => {
725 let query = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
726 match store.query_graph(&query) {
727 Ok(result) => (Ok(result), None),
728 Err(e) => (Err(e), None),
729 }
730 }
731 DbOp::Link => {
732 let id = msg.id.as_deref().unwrap_or("");
733 let relation = msg.relation.as_deref().unwrap_or("");
734 let target_id = msg.target_id.as_deref().unwrap_or("");
735 match store.link(&msg.entity, id, relation, target_id) {
736 Ok(linked) => (Ok(serde_json::json!({"linked": linked})), None),
737 Err(e) => (Err(e), None),
738 }
739 }
740 DbOp::Unlink => {
741 let id = msg.id.as_deref().unwrap_or("");
742 let relation = msg.relation.as_deref().unwrap_or("");
743 match store.unlink(&msg.entity, id, relation) {
744 Ok(unlinked) => (Ok(serde_json::json!({"unlinked": unlinked})), None),
745 Err(e) => (Err(e), None),
746 }
747 }
748 DbOp::Search => {
749 let query = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
750 match store.search(&msg.entity, &query) {
751 Ok(result) => {
752 // Surface a coarse hit count for traces. The
753 // SearchResult JSON shape is `{ hits, ... }`; if
754 // the structure ever changes, the trace just
755 // shows None — never crashes.
756 let count = result
757 .get("hits")
758 .and_then(|v| v.as_array())
759 .map(|a| a.len());
760 (Ok(result), count)
761 }
762 Err(e) => (Err(e), None),
763 }
764 }
765 }
766}
767
768// ---------------------------------------------------------------------------
769// Error type
770// ---------------------------------------------------------------------------
771
772#[derive(Debug, Clone)]
773pub struct FnCallError {
774 pub code: String,
775 pub message: String,
776}
777
778impl std::fmt::Display for FnCallError {
779 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
780 write!(f, "[{}] {}", self.code, self.message)
781 }
782}
783
784impl std::error::Error for FnCallError {}
785
786/// Lift a `DataError` straight into a `FnCallError`. Lets the Postgres
787/// mutation path use `PostgresDataStore::with_transaction(|store| ...)`
788/// — its bound is `E: From<DataError>`, so any infrastructure failure
789/// (lock poisoning, BEGIN/COMMIT) surfaces as a clean `FnCallError`
790/// rather than needing manual mapping at the closure boundary. The
791/// mapping is 1:1 because both error types carry just `{ code, message }`.
792impl From<pylon_http::DataError> for FnCallError {
793 fn from(e: pylon_http::DataError) -> Self {
794 FnCallError {
795 code: e.code,
796 message: e.message,
797 }
798 }
799}
800
801#[cfg(test)]
802mod tests {
803 use super::*;
804
805 #[test]
806 fn fn_call_error_display() {
807 let e = FnCallError {
808 code: "TEST".into(),
809 message: "fail".into(),
810 };
811 assert_eq!(format!("{e}"), "[TEST] fail");
812 }
813}