Skip to main content

pylon_functions/
runner.rs

1//! Function runner — executes TypeScript functions via the bidirectional protocol.
2//!
3//! The runner manages the connection to the Bun/Deno process and mediates
4//! all communication. It handles DB operations, stream forwarding, scheduling,
5//! and transaction management.
6
7use std::io::{BufRead, BufReader, Write};
8use std::process::{Child, Command, Stdio};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
11use std::sync::Mutex;
12use std::time::{Duration, Instant};
13
14use pylon_http::DataStore;
15
16use crate::protocol::*;
17use crate::trace::{TraceBuilder, TraceLog};
18
19/// Default ceiling on how long a single function call may take. Holds the
20/// SQLite write lock for mutations, so this is also a backstop against a
21/// runaway TS handler blocking the whole DB. Override via
22/// [`FnRunner::set_call_timeout`] or `PYLON_FN_CALL_TIMEOUT` (server-side).
23pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
24
25// ---------------------------------------------------------------------------
26// Stream callback — receives SSE chunks during execution
27// ---------------------------------------------------------------------------
28
29/// Callback invoked for each stream chunk during function execution.
30/// The server layer converts these into SSE events on the HTTP response.
31pub type StreamCallback = Box<dyn FnMut(&str) + Send>;
32
33/// Callback invoked when a function calls `ctx.scheduler.runAfter/runAt`.
34/// Returns `Ok(job_id)` on success or `Err(msg)` on persistence/queue
35/// failure. The runner reports the error back to the calling handler so
36/// users don't get a silent `{scheduled: true, id: ""}`.
37pub type ScheduleHook = Box<
38    dyn Fn(&str, serde_json::Value, Option<u64>, Option<u64>) -> Result<String, String>
39        + Send
40        + Sync,
41>;
42
43/// Callback invoked when a running function asks to run *another* function
44/// (action → query/mutation). The wrapper is responsible for any per-type
45/// setup — notably wrapping mutations in their own BEGIN/COMMIT, which
46/// can't happen inside `call_inner` because that path holds the io_lock
47/// and is called with the outer action's non-transactional store.
48///
49/// Returns the nested function's return value or a `FnCallError`-shaped
50/// `(code, message)` pair. The runner translates the error back into the
51/// NDJSON protocol reply so the TS side sees the same shape it always did.
52pub type NestedCallHook = Box<
53    dyn Fn(&str, FnType, serde_json::Value, AuthInfo) -> Result<serde_json::Value, (String, String)>
54        + Send
55        + Sync,
56>;
57
58// ---------------------------------------------------------------------------
59// Function runner
60// ---------------------------------------------------------------------------
61
62/// Manages the TypeScript process and executes function calls.
63pub struct FnRunner {
64    process: Mutex<Option<Child>>,
65    /// Stdin half — guarded so concurrent senders don't interleave bytes.
66    stdin: Mutex<Option<std::process::ChildStdin>>,
67    /// Channel of parsed messages from the reader thread. Single consumer
68    /// (callers serialize via `io_lock`), so no per-call demuxing.
69    inbox: Mutex<Option<Receiver<TsMessage>>>,
70    /// Held for the duration of a call to keep request/response in order.
71    /// Also serializes the underlying single Bun process.
72    io_lock: Mutex<()>,
73    call_counter: AtomicU64,
74    pub trace_log: TraceLog,
75    schedule_hook: Mutex<Option<ScheduleHook>>,
76    /// Optional override for nested function calls (action → query/mutation).
77    /// When set, the runner delegates `RunFn` messages to this hook so the
78    /// caller can wrap mutations in their own transaction. When absent, we
79    /// fall back to the old recursive path (no transaction for nested
80    /// mutations — documented limitation).
81    nested_call_hook: Mutex<Option<NestedCallHook>>,
82    /// Timeout for `recv()` between protocol messages. A handler that doesn't
83    /// reply within this window is treated as stuck.
84    call_timeout: Mutex<Duration>,
85    /// The command and args that started the runtime. Stored so the supervisor
86    /// can respawn on crash without the caller re-passing them.
87    started_with: Mutex<Option<(String, Vec<String>)>>,
88}
89
90impl FnRunner {
91    /// Create a new runner with the given trace log capacity.
92    pub fn new(trace_capacity: usize) -> Self {
93        Self {
94            process: Mutex::new(None),
95            stdin: Mutex::new(None),
96            inbox: Mutex::new(None),
97            io_lock: Mutex::new(()),
98            call_counter: AtomicU64::new(0),
99            trace_log: TraceLog::new(trace_capacity),
100            schedule_hook: Mutex::new(None),
101            nested_call_hook: Mutex::new(None),
102            call_timeout: Mutex::new(DEFAULT_CALL_TIMEOUT),
103            started_with: Mutex::new(None),
104        }
105    }
106
107    /// Override the per-call timeout. The default is 30s.
108    pub fn set_call_timeout(&self, timeout: Duration) {
109        *self.call_timeout.lock().unwrap() = timeout;
110    }
111
112    /// Install a callback to handle `ctx.scheduler` requests from functions.
113    pub fn set_schedule_hook(&self, hook: ScheduleHook) {
114        *self.schedule_hook.lock().unwrap() = Some(hook);
115    }
116
117    /// Install a callback used for nested function calls (action → query or
118    /// mutation). The callback is responsible for transactional wrapping when
119    /// the nested fn is a mutation. Without this hook, nested mutations share
120    /// the outer action's non-transactional store and writes aren't atomic.
121    pub fn set_nested_call_hook(&self, hook: NestedCallHook) {
122        *self.nested_call_hook.lock().unwrap() = Some(hook);
123    }
124
125    /// Start the TypeScript process and complete the startup handshake.
126    ///
127    /// Spawns the child + reader thread, waits for the runtime's `Ready`
128    /// message, and only then publishes stdin/inbox/process so callers can
129    /// see the runner. This avoids the race where a concurrent `call()`
130    /// would consume the `Ready` message and desync the protocol.
131    ///
132    /// On any failure (spawn, missing pipes, bad handshake, runtime-reported
133    /// error) the child is killed before returning so a half-alive process
134    /// doesn't survive — important for the supervisor, which uses
135    /// `is_alive()` and would otherwise see "still running" forever.
136    ///
137    /// Returns the function definitions reported by the runtime.
138    pub fn start(
139        &self,
140        command: &str,
141        args: &[&str],
142    ) -> Result<Vec<crate::registry::FnDef>, String> {
143        let mut child = Command::new(command)
144            .args(args)
145            .stdin(Stdio::piped())
146            .stdout(Stdio::piped())
147            .stderr(Stdio::inherit())
148            .spawn()
149            .map_err(|e| format!("Failed to start function runner: {e}"))?;
150
151        let stdin = child
152            .stdin
153            .take()
154            .ok_or_else(|| kill_and_msg(&mut child, "Failed to capture stdin".to_string()))?;
155        let stdout = child
156            .stdout
157            .take()
158            .ok_or_else(|| kill_and_msg(&mut child, "Failed to capture stdout".to_string()))?;
159
160        let (tx, rx): (Sender<TsMessage>, Receiver<TsMessage>) = mpsc::channel();
161        std::thread::Builder::new()
162            .name("pylon-fn-reader".into())
163            .spawn(move || reader_loop(BufReader::new(stdout), tx))
164            .map_err(|e| kill_and_msg(&mut child, format!("Failed to spawn reader thread: {e}")))?;
165
166        // Read Ready BEFORE publishing the new IO. If we published first, a
167        // concurrent caller could send a request and `recv()` would eat the
168        // Ready in the catch-all match arm, leaving us in protocol limbo.
169        let ready_msg = match rx.recv_timeout(Duration::from_secs(10)) {
170            Ok(m) => m,
171            Err(_) => {
172                let _ = child.kill();
173                let _ = child.wait();
174                return Err("handshake timeout: TS runtime did not send Ready within 10s".into());
175            }
176        };
177        let defs = match ready_msg {
178            TsMessage::Ready(r) => {
179                if let Some(err) = r.error {
180                    let _ = child.kill();
181                    let _ = child.wait();
182                    return Err(format!("Runtime startup error: {err}"));
183                }
184                r.functions
185            }
186            other => {
187                let _ = child.kill();
188                let _ = child.wait();
189                return Err(format!("expected Ready handshake, got {other:?}"));
190            }
191        };
192
193        // Handshake succeeded — publish.
194        *self.stdin.lock().unwrap() = Some(stdin);
195        *self.inbox.lock().unwrap() = Some(rx);
196        *self.process.lock().unwrap() = Some(child);
197        *self.started_with.lock().unwrap() = Some((
198            command.to_string(),
199            args.iter().map(|s| s.to_string()).collect(),
200        ));
201
202        Ok(defs)
203    }
204
205    /// Check if the TypeScript process is running.
206    pub fn is_running(&self) -> bool {
207        self.process.lock().unwrap().is_some()
208    }
209
210    /// Returns true if the child process is alive. Distinct from `is_running`
211    /// which only checks that we ever started one — supervisor uses this.
212    pub fn is_alive(&self) -> bool {
213        let mut guard = self.process.lock().unwrap();
214        match guard.as_mut() {
215            None => false,
216            Some(child) => match child.try_wait() {
217                Ok(Some(_status)) => false, // exited
218                Ok(None) => true,           // still running
219                Err(_) => false,            // can't tell — assume dead
220            },
221        }
222    }
223
224    /// Restart the underlying process using the command/args from the original
225    /// `start()` call. The supervisor uses this; callers should not need it.
226    /// Returns the freshly-handshaked function definitions. On any failure
227    /// the new child has already been killed by `start()`.
228    pub fn respawn(&self) -> Result<Vec<crate::registry::FnDef>, String> {
229        let started = self
230            .started_with
231            .lock()
232            .unwrap()
233            .clone()
234            .ok_or_else(|| "Cannot respawn: runner was never started".to_string())?;
235        // Drop the dead child + IO before spawning a new one.
236        self.kill();
237        let arg_refs: Vec<&str> = started.1.iter().map(|s| s.as_str()).collect();
238        self.start(&started.0, &arg_refs)
239    }
240
241    /// Forcefully kill the child process. Used by the supervisor on timeout
242    /// or when the runtime is shutting down. The reader thread will exit
243    /// cleanly when its stdout closes.
244    pub fn kill(&self) {
245        if let Some(mut child) = self.process.lock().unwrap().take() {
246            let _ = child.kill();
247            let _ = child.wait();
248        }
249        // Drop stdin so the reader thread sees EOF and exits.
250        *self.stdin.lock().unwrap() = None;
251        *self.inbox.lock().unwrap() = None;
252    }
253
254    /// Backwards-compatible: `start()` now performs the handshake itself
255    /// and returns the function definitions. `handshake()` is a no-op shim
256    /// that returns whatever the runtime is currently registered to.
257    /// Kept so existing callers (`try_spawn_functions`) compile without churn.
258    pub fn handshake(&self) -> Result<Vec<crate::registry::FnDef>, String> {
259        Err("handshake is now performed inside start(); use the return value".to_string())
260    }
261
262    /// Execute a function call against the TypeScript process.
263    ///
264    /// For mutations: the caller must hold the write lock and pass a transaction-capable store.
265    /// For queries: uses the read pool, no locking required.
266    /// For actions: no direct DB access, calls run_fn for nested queries/mutations.
267    ///
268    /// Returns `(return_value, trace)`. Stream chunks are delivered via the callback.
269    pub fn call(
270        &self,
271        store: &dyn DataStore,
272        fn_name: &str,
273        fn_type: FnType,
274        args: serde_json::Value,
275        auth: AuthInfo,
276        on_stream: Option<StreamCallback>,
277        request: Option<crate::protocol::RequestInfo>,
278    ) -> Result<(serde_json::Value, crate::trace::FnTrace), FnCallError> {
279        // Serialize all top-level calls — one Bun process, NDJSON over stdio
280        // is not multiplexed at this layer. Nested calls (action → query)
281        // recurse through `call_inner` WITHOUT re-acquiring the lock.
282        // `std::sync::Mutex` is not re-entrant, so doing otherwise wedges.
283        let _io = self.io_lock.lock().unwrap();
284        self.call_inner(store, fn_name, fn_type, args, auth, on_stream, request)
285    }
286
287    /// Protocol-only call — assumes the caller already holds `io_lock`.
288    /// This is the body of a `call()` minus the lock. It is `pub` so the
289    /// nested-call hook in `FnOpsImpl` can re-enter the protocol for a
290    /// transactional mutation wrap without re-acquiring the mutex (which
291    /// would deadlock since `std::sync::Mutex` is not re-entrant).
292    ///
293    /// # Safety contract
294    /// Do not call directly from code that didn't acquire `io_lock` via a
295    /// prior `call()` invocation. Callers outside this crate should use
296    /// `call()`; the only external caller is the nested-call hook.
297    pub fn call_inner(
298        &self,
299        store: &dyn DataStore,
300        fn_name: &str,
301        fn_type: FnType,
302        args: serde_json::Value,
303        auth: AuthInfo,
304        mut on_stream: Option<StreamCallback>,
305        request: Option<crate::protocol::RequestInfo>,
306    ) -> Result<(serde_json::Value, crate::trace::FnTrace), FnCallError> {
307        let timeout = *self.call_timeout.lock().unwrap();
308        let deadline = Instant::now() + timeout;
309
310        let call_id = format!("c_{}", self.call_counter.fetch_add(1, Ordering::Relaxed));
311        let mut trace = TraceBuilder::new_with_tenant(
312            call_id.clone(),
313            fn_name.to_string(),
314            fn_type,
315            auth.user_id.clone(),
316            auth.tenant_id.clone(),
317        );
318
319        // Send the call message. Attach HTTP request metadata when the
320        // caller provided it — this lets TypeScript actions invoked via
321        // /api/webhooks/:name see raw headers + body for signature checks.
322        let mut call_msg =
323            CallMessage::new(call_id.clone(), fn_name.to_string(), fn_type, args, auth);
324        if let Some(r) = request {
325            call_msg = call_msg.with_request(r);
326        }
327        self.send(&call_msg)?;
328
329        // Process messages until we get a return or error.
330        loop {
331            let msg = match self.recv(deadline) {
332                Ok(m) => m,
333                Err(e) if e.code == "FN_TIMEOUT" => {
334                    // The child is now in an unknown state — it owns the call
335                    // mid-flight and may be holding open whatever resource it
336                    // had. Kill it; the supervisor will respawn. Better to
337                    // lose the runtime than to wedge the SQLite write lock.
338                    tracing::warn!(
339                        "[functions] Killing TS runtime: call \"{}\" exceeded {:?}",
340                        fn_name,
341                        timeout
342                    );
343                    self.kill();
344                    let fn_trace = trace.finish_error(
345                        "FN_TIMEOUT".into(),
346                        format!("Function \"{fn_name}\" exceeded timeout {timeout:?}"),
347                    );
348                    self.trace_log.push(fn_trace);
349                    return Err(e);
350                }
351                Err(e) => return Err(e),
352            };
353            match msg {
354                TsMessage::Db(db_msg) if db_msg.call_id == call_id => {
355                    let op_start = Instant::now();
356                    let (result, row_count) = execute_db_op(store, &db_msg);
357                    let duration = op_start.elapsed();
358                    let ok = result.is_ok();
359
360                    trace.record_op(
361                        db_msg.op,
362                        &db_msg.entity,
363                        db_msg.id.as_deref(),
364                        duration,
365                        row_count,
366                        ok,
367                    );
368
369                    // Echo op_id from the request so the TS side can demux
370                    // concurrent DB ops from a single handler. Old TS
371                    // runtimes that don't send op_id get the same behavior
372                    // as before (one in-flight at a time, serialized by
373                    // pendingRpcs key collision).
374                    let reply = match result {
375                        Ok(data) => {
376                            DbResultMessage::ok_with_op(call_id.clone(), db_msg.op_id.clone(), data)
377                        }
378                        Err(e) => DbResultMessage::err_with_op(
379                            call_id.clone(),
380                            db_msg.op_id.clone(),
381                            &e.code,
382                            &e.message,
383                        ),
384                    };
385                    self.send(&reply)?;
386                }
387
388                TsMessage::Stream(chunk) if chunk.call_id == call_id => {
389                    trace.record_stream_chunk(chunk.data.len());
390                    if let Some(ref mut cb) = on_stream {
391                        cb(&chunk.data);
392                    }
393                }
394
395                TsMessage::Schedule(sched) if sched.call_id == call_id => {
396                    trace.record_schedule(&sched.fn_name, sched.delay_ms, sched.run_at);
397                    let hook_result: Result<String, String> = {
398                        let hook = self.schedule_hook.lock().unwrap();
399                        match *hook {
400                            Some(ref cb) => cb(
401                                &sched.fn_name,
402                                sched.args.clone(),
403                                sched.delay_ms,
404                                sched.run_at,
405                            ),
406                            None => Err("no schedule hook installed".into()),
407                        }
408                    };
409                    let reply = match hook_result {
410                        Ok(id) => DbResultMessage::ok(
411                            call_id.clone(),
412                            serde_json::json!({"scheduled": true, "id": id}),
413                        ),
414                        Err(e) => DbResultMessage::err(call_id.clone(), "SCHEDULE_FAILED", &e),
415                    };
416                    self.send(&reply)?;
417                }
418
419                TsMessage::CancelSchedule(cancel) if cancel.call_id == call_id => {
420                    let reply = DbResultMessage::ok(
421                        call_id.clone(),
422                        serde_json::json!({"cancelled": true}),
423                    );
424                    self.send(&reply)?;
425                }
426
427                TsMessage::RunFn(run) if run.call_id == call_id => {
428                    // Nested function call (action calling query/mutation).
429                    // Execute recursively. The nested call gets its own trace
430                    // but inherits user + tenant from the caller so row-level
431                    // policies (`auth.tenantId == data.orgId`) keep working
432                    // when an action stamps tenant-scoped writes via helper
433                    // mutations. Callers that need to cross tenant boundaries
434                    // must do so on the client side — no silent elevation
435                    // happens here; the caller's tenant carries through.
436                    let nested_auth = AuthInfo {
437                        user_id: trace.user_id().map(|s| s.to_string()),
438                        is_admin: false,
439                        tenant_id: trace.tenant_id().map(|s| s.to_string()),
440                    };
441                    // Prefer the nested_call_hook if installed — it lets the
442                    // caller wrap mutations in their own BEGIN/COMMIT around
443                    // a TxStore. Falling back to direct recursion leaves
444                    // mutations non-transactional when triggered from an
445                    // action (documented limitation).
446                    let hook_result: Option<Result<serde_json::Value, (String, String)>> = {
447                        let hook = self.nested_call_hook.lock().unwrap();
448                        hook.as_ref().map(|cb| {
449                            cb(
450                                &run.fn_name,
451                                run.fn_type,
452                                run.args.clone(),
453                                nested_auth.clone(),
454                            )
455                        })
456                    };
457                    let reply = match hook_result {
458                        Some(Ok(value)) => DbResultMessage::ok(call_id.clone(), value),
459                        Some(Err((code, msg))) => {
460                            DbResultMessage::err(call_id.clone(), &code, &msg)
461                        }
462                        None => {
463                            // No hook installed — fall back to direct recursion.
464                            // Already inside io_lock, so use call_inner. Nested
465                            // calls never get HTTP request metadata.
466                            match self.call_inner(
467                                store,
468                                &run.fn_name,
469                                run.fn_type,
470                                run.args,
471                                nested_auth,
472                                None,
473                                None,
474                            ) {
475                                Ok((value, _nested_trace)) => {
476                                    DbResultMessage::ok(call_id.clone(), value)
477                                }
478                                Err(e) => DbResultMessage::err(
479                                    call_id.clone(),
480                                    "FN_CALL_FAILED",
481                                    &e.message,
482                                ),
483                            }
484                        }
485                    };
486                    self.send(&reply)?;
487                }
488
489                TsMessage::Return(ret) if ret.call_id == call_id => {
490                    let fn_trace = trace.finish_ok(Some(ret.value.clone()));
491                    self.trace_log.push(fn_trace.clone());
492                    return Ok((ret.value, fn_trace));
493                }
494
495                TsMessage::Error(err) if err.call_id == call_id => {
496                    let fn_trace = trace.finish_error(err.code.clone(), err.message.clone());
497                    self.trace_log.push(fn_trace.clone());
498                    return Err(FnCallError {
499                        code: err.code,
500                        message: err.message,
501                    });
502                }
503
504                // Messages for a different call_id — shouldn't happen with
505                // sequential execution, but skip gracefully.
506                _ => {}
507            }
508        }
509    }
510
511    fn send<T: serde::Serialize>(&self, msg: &T) -> Result<(), FnCallError> {
512        let mut stdin_guard = self.stdin.lock().unwrap();
513        let stdin = stdin_guard.as_mut().ok_or_else(|| FnCallError {
514            code: "RUNNER_NOT_STARTED".into(),
515            message: "TypeScript function runner is not running".into(),
516        })?;
517
518        let mut line = serde_json::to_string(msg).map_err(|e| FnCallError {
519            code: "SERIALIZE_FAILED".into(),
520            message: format!("Failed to serialize message: {e}"),
521        })?;
522        line.push('\n');
523
524        stdin.write_all(line.as_bytes()).map_err(|e| FnCallError {
525            code: "IO_ERROR".into(),
526            message: format!("Failed to write to runner: {e}"),
527        })?;
528        stdin.flush().map_err(|e| FnCallError {
529            code: "IO_ERROR".into(),
530            message: format!("Failed to flush runner stdin: {e}"),
531        })?;
532
533        Ok(())
534    }
535
536    fn recv(&self, deadline: Instant) -> Result<TsMessage, FnCallError> {
537        let inbox_guard = self.inbox.lock().unwrap();
538        let inbox = inbox_guard.as_ref().ok_or_else(|| FnCallError {
539            code: "RUNNER_NOT_STARTED".into(),
540            message: "TypeScript function runner is not running".into(),
541        })?;
542
543        let now = Instant::now();
544        let remaining = if deadline <= now {
545            Duration::ZERO
546        } else {
547            deadline - now
548        };
549
550        match inbox.recv_timeout(remaining) {
551            Ok(msg) => Ok(msg),
552            Err(RecvTimeoutError::Timeout) => Err(FnCallError {
553                code: "FN_TIMEOUT".into(),
554                message: "Function exceeded the configured call timeout".into(),
555            }),
556            Err(RecvTimeoutError::Disconnected) => Err(FnCallError {
557                code: "RUNNER_EXITED".into(),
558                message: "TypeScript function runner process exited unexpectedly".into(),
559            }),
560        }
561    }
562}
563
564/// Kill a child and pass through an error message — used during start()
565/// when something goes wrong after spawn but before publishing the IO.
566/// Always wait() after kill() so the child is reaped — otherwise it
567/// hangs around as a zombie until the parent exits.
568fn kill_and_msg(child: &mut Child, msg: String) -> String {
569    let _ = child.kill();
570    let _ = child.wait();
571    msg
572}
573
574/// Background reader thread: parses NDJSON lines from the Bun stdout into
575/// TsMessage values and forwards them to the channel. Exits when stdout
576/// closes (child died or was killed).
577fn reader_loop(mut stdout: BufReader<std::process::ChildStdout>, tx: Sender<TsMessage>) {
578    let mut line = String::new();
579    loop {
580        line.clear();
581        match stdout.read_line(&mut line) {
582            Ok(0) => break,  // EOF — child exited
583            Err(_) => break, // pipe error — child gone
584            Ok(_) => {}
585        }
586        match serde_json::from_str::<TsMessage>(line.trim()) {
587            Ok(msg) => {
588                if tx.send(msg).is_err() {
589                    break; // Receiver dropped — runner shutting down
590                }
591            }
592            Err(e) => {
593                tracing::warn!(
594                    "[functions] Skipping unparseable line from Bun runtime: {e} (line={:?})",
595                    line.trim()
596                );
597            }
598        }
599    }
600}
601
602impl Drop for FnRunner {
603    fn drop(&mut self) {
604        if let Some(mut child) = self.process.lock().unwrap().take() {
605            let _ = child.kill();
606            let _ = child.wait();
607        }
608    }
609}
610
611// ---------------------------------------------------------------------------
612// TraceBuilder helper (access user_id during execution)
613// ---------------------------------------------------------------------------
614
615impl TraceBuilder {
616    pub fn user_id(&self) -> Option<&str> {
617        self.user_id.as_deref()
618    }
619}
620
621// ---------------------------------------------------------------------------
622// DB operation executor
623// ---------------------------------------------------------------------------
624
625/// Execute a DB operation from a TypeScript function against the DataStore.
626///
627/// Returns the result value and optional row count (for traces).
628fn execute_db_op(
629    store: &dyn DataStore,
630    msg: &DbOpMessage,
631) -> (
632    Result<serde_json::Value, pylon_http::DataError>,
633    Option<usize>,
634) {
635    match msg.op {
636        DbOp::Get => {
637            let id = msg.id.as_deref().unwrap_or("");
638            match store.get_by_id(&msg.entity, id) {
639                Ok(Some(row)) => (Ok(row), Some(1)),
640                Ok(None) => (Ok(serde_json::Value::Null), Some(0)),
641                Err(e) => (Err(e), None),
642            }
643        }
644        DbOp::List => match store.list(&msg.entity) {
645            Ok(rows) => {
646                let count = rows.len();
647                (Ok(serde_json::json!(rows)), Some(count))
648            }
649            Err(e) => (Err(e), None),
650        },
651        DbOp::Paginate => {
652            // Fetch limit+1 to detect "isDone" without an extra round trip,
653            // matching the router's /api/entities/:e/cursor endpoint.
654            let requested = msg.limit.unwrap_or(20).min(1000).max(1) as usize;
655            let after = msg.after.as_deref();
656            match store.list_after(&msg.entity, after, requested + 1) {
657                Ok(mut rows) => {
658                    let is_done = rows.len() <= requested;
659                    if !is_done {
660                        rows.truncate(requested);
661                    }
662                    let next_cursor = if is_done {
663                        None
664                    } else {
665                        rows.last()
666                            .and_then(|r| r.get("id"))
667                            .and_then(|v| v.as_str())
668                            .map(|s| s.to_string())
669                    };
670                    let count = rows.len();
671                    (
672                        Ok(serde_json::json!({
673                            "page": rows,
674                            "nextCursor": next_cursor,
675                            "isDone": is_done,
676                        })),
677                        Some(count),
678                    )
679                }
680                Err(e) => (Err(e), None),
681            }
682        }
683        DbOp::Insert => {
684            let data = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
685            match store.insert(&msg.entity, &data) {
686                Ok(id) => (Ok(serde_json::json!({"id": id})), None),
687                Err(e) => (Err(e), None),
688            }
689        }
690        DbOp::Update => {
691            let id = msg.id.as_deref().unwrap_or("");
692            let data = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
693            match store.update(&msg.entity, id, &data) {
694                Ok(updated) => (Ok(serde_json::json!({"updated": updated})), None),
695                Err(e) => (Err(e), None),
696            }
697        }
698        DbOp::Delete => {
699            let id = msg.id.as_deref().unwrap_or("");
700            match store.delete(&msg.entity, id) {
701                Ok(deleted) => (Ok(serde_json::json!({"deleted": deleted})), None),
702                Err(e) => (Err(e), None),
703            }
704        }
705        DbOp::Lookup => {
706            let field = msg.field.as_deref().unwrap_or("");
707            let value = msg.value.as_deref().unwrap_or("");
708            match store.lookup(&msg.entity, field, value) {
709                Ok(Some(row)) => (Ok(row), Some(1)),
710                Ok(None) => (Ok(serde_json::Value::Null), Some(0)),
711                Err(e) => (Err(e), None),
712            }
713        }
714        DbOp::Query => {
715            let filter = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
716            match store.query_filtered(&msg.entity, &filter) {
717                Ok(rows) => {
718                    let count = rows.len();
719                    (Ok(serde_json::json!(rows)), Some(count))
720                }
721                Err(e) => (Err(e), None),
722            }
723        }
724        DbOp::QueryGraph => {
725            let query = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
726            match store.query_graph(&query) {
727                Ok(result) => (Ok(result), None),
728                Err(e) => (Err(e), None),
729            }
730        }
731        DbOp::Link => {
732            let id = msg.id.as_deref().unwrap_or("");
733            let relation = msg.relation.as_deref().unwrap_or("");
734            let target_id = msg.target_id.as_deref().unwrap_or("");
735            match store.link(&msg.entity, id, relation, target_id) {
736                Ok(linked) => (Ok(serde_json::json!({"linked": linked})), None),
737                Err(e) => (Err(e), None),
738            }
739        }
740        DbOp::Unlink => {
741            let id = msg.id.as_deref().unwrap_or("");
742            let relation = msg.relation.as_deref().unwrap_or("");
743            match store.unlink(&msg.entity, id, relation) {
744                Ok(unlinked) => (Ok(serde_json::json!({"unlinked": unlinked})), None),
745                Err(e) => (Err(e), None),
746            }
747        }
748        DbOp::Search => {
749            let query = msg.data.as_ref().cloned().unwrap_or(serde_json::json!({}));
750            match store.search(&msg.entity, &query) {
751                Ok(result) => {
752                    // Surface a coarse hit count for traces. The
753                    // SearchResult JSON shape is `{ hits, ... }`; if
754                    // the structure ever changes, the trace just
755                    // shows None — never crashes.
756                    let count = result
757                        .get("hits")
758                        .and_then(|v| v.as_array())
759                        .map(|a| a.len());
760                    (Ok(result), count)
761                }
762                Err(e) => (Err(e), None),
763            }
764        }
765    }
766}
767
768// ---------------------------------------------------------------------------
769// Error type
770// ---------------------------------------------------------------------------
771
772#[derive(Debug, Clone)]
773pub struct FnCallError {
774    pub code: String,
775    pub message: String,
776}
777
778impl std::fmt::Display for FnCallError {
779    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
780        write!(f, "[{}] {}", self.code, self.message)
781    }
782}
783
784impl std::error::Error for FnCallError {}
785
786/// Lift a `DataError` straight into a `FnCallError`. Lets the Postgres
787/// mutation path use `PostgresDataStore::with_transaction(|store| ...)`
788/// — its bound is `E: From<DataError>`, so any infrastructure failure
789/// (lock poisoning, BEGIN/COMMIT) surfaces as a clean `FnCallError`
790/// rather than needing manual mapping at the closure boundary. The
791/// mapping is 1:1 because both error types carry just `{ code, message }`.
792impl From<pylon_http::DataError> for FnCallError {
793    fn from(e: pylon_http::DataError) -> Self {
794        FnCallError {
795            code: e.code,
796            message: e.message,
797        }
798    }
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804
805    #[test]
806    fn fn_call_error_display() {
807        let e = FnCallError {
808            code: "TEST".into(),
809            message: "fail".into(),
810        };
811        assert_eq!(format!("{e}"), "[TEST] fail");
812    }
813}